You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:36 UTC

[helix] branch master updated (a5cddd4 -> 5f6d1eb)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from a5cddd4  Update copyright with apache license to pass rat check (#1212)
     new 813e49e  Fix the flaky test TestRecurringJobQueue.testCreateStoppedQueue (#983)
     new f4cfbc7  Stabilize TestWorkflowTimeout and TestTaskRebalancer (#991)
     new ce100d2  Make the task scheduling decision independent of the PreviousAssignment (#994)
     new c122577  Remove previousAssignment in processTaskWithPendingMessage method (#1040)
     new cb8c696  Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)
     new f11243f  Remove previousAssignment read/write to ZK (#1074)
     new d39b456  Respect Maximum Number Of Attempts for the tasks (#1142)
     new 82c4640  Quota calculation based on CurrentState (#1165)
     new 3501111  Recover Workflow GC Logic (#1181)
     new 5f6d1eb  Change the logs to address new changes

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/helix/common/caches/TaskDataCache.java  |   50 +-
 .../WorkflowControllerDataProvider.java            |   29 +-
 .../helix/controller/stages/AttributeName.java     |    6 +-
 .../controller/stages/CurrentStateOutput.java      |   14 +-
 .../stages/TaskGarbageCollectionStage.java         |   95 +-
 .../stages/task/TaskSchedulingStage.java           |    8 +
 .../java/org/apache/helix/model/IdealState.java    |   18 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  |  234 ++--
 .../helix/task/AssignableInstanceManager.java      |  172 +++
 .../helix/task/DeprecatedTaskRebalancer.java       | 1149 --------------------
 .../task/FixedTargetTaskAssignmentCalculator.java  |   47 +-
 .../helix/task/FixedTargetTaskRebalancer.java      |   60 -
 .../task/GenericTaskAssignmentCalculator.java      |   12 +-
 .../apache/helix/task/GenericTaskRebalancer.java   |   57 -
 .../java/org/apache/helix/task/JobDispatcher.java  |  113 +-
 .../helix/task/TaskAssignmentCalculator.java       |   18 +
 .../main/java/org/apache/helix/task/TaskUtil.java  |  173 +--
 .../ThreadCountBasedTaskAssignmentCalculator.java  |   10 +
 .../org/apache/helix/task/WorkflowDispatcher.java  |   19 +-
 .../helix/controller/stages/TestTaskStage.java     |   89 +-
 .../integration/task/TestForceDeleteWorkflow.java  |    6 +-
 .../integration/task/TestJobQueueCleanUp.java      |   18 +-
 .../task/TestMaxNumberOfAttemptsMasterSwitch.java  |  152 +++
 .../integration/task/TestRecurringJobQueue.java    |   14 +-
 .../helix/integration/task/TestStopWorkflow.java   |    6 +-
 .../helix/integration/task/TestStuckTaskQuota.java |  189 ++++
 .../helix/integration/task/TestTaskRebalancer.java |   14 +-
 .../task/TestTaskSchedulingTwoCurrentStates.java   |   13 -
 .../helix/integration/task/TestTaskStopQueue.java  |   10 +-
 .../task/TestWorkflowContextWithoutConfig.java     |   66 ++
 .../integration/task/TestWorkflowTimeout.java      |   17 +-
 .../helix/task/TestDropTerminalTasksUponReset.java |    3 +-
 .../TestFixedTargetedTaskAssignmentCalculator.java |  288 +++++
 .../helix/task/TestTargetedTaskStateChange.java    |   29 +-
 .../java/org/apache/helix/task/TestTaskUtil.java   |   95 ++
 ...eviousAssignedTaskStatusWithPendingMessage.java |  148 +++
 36 files changed, 1718 insertions(+), 1723 deletions(-)
 delete mode 100644 helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
 delete mode 100644 helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
 delete mode 100644 helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java


[helix] 01/10: Fix the flaky test TestRecurringJobQueue.testCreateStoppedQueue (#983)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 813e49ec8816d6fa2794353155891d341a2512c1
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Mon May 4 10:37:43 2020 -0700

    Fix the flaky test TestRecurringJobQueue.testCreateStoppedQueue (#983)
    
    In this commit, the necessary checks have been added to make sure
    we do not hit NullPointerException when checking LastScheduledWorkflow.
---
 .../helix/integration/task/TestRecurringJobQueue.java      | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 6bf5a92..20c53b1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -223,7 +223,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
   }
 
   @Test
-  public void testCreateStoppedQueue() throws InterruptedException {
+  public void testCreateStoppedQueue() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue
@@ -238,12 +238,14 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     _driver.resume(queueName);
 
-    //TaskTestUtil.pollForWorkflowState(_driver, queueName, );
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    // ensure LAST_SCHEDULED_WORKFLOW field is written to Zookeeper
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      return wCtx.getLastScheduledSingleWorkflow() != null;
+    }, TestHelper.WAIT_DURATION));
 
-    // ensure current schedule is started
-    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    _driver.pollForWorkflowState(scheduledQueue, TaskState.COMPLETED);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    _driver.pollForWorkflowState(wCtx.getLastScheduledSingleWorkflow(), TaskState.COMPLETED);
   }
 
   @Test


[helix] 09/10: Recover Workflow GC Logic (#1181)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3501111a3b9cae27bb7b875fadbf49406a4b6cd0
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Wed Jul 29 10:32:59 2020 -0700

    Recover Workflow GC Logic (#1181)
    
    Recover Workflow Garbage Collection Logic
    Recover Workflow Garbage Collection Logic
---
 .../helix/controller/stages/AttributeName.java     |   6 +-
 .../stages/TaskGarbageCollectionStage.java         |  95 +++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  | 173 ++++++++++++---------
 .../org/apache/helix/task/WorkflowDispatcher.java  |  14 --
 .../helix/controller/stages/TestTaskStage.java     |  89 +++++++++--
 .../integration/task/TestJobQueueCleanUp.java      |  19 ++-
 .../task/TestWorkflowContextWithoutConfig.java     |  66 ++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   |  95 +++++++++++
 8 files changed, 438 insertions(+), 119 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 589988f..9a0bbb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,5 +40,9 @@ public enum AttributeName {
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER
+  STATEFUL_REBALANCER,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+  TO_BE_PURGED_WORKFLOWS,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+  TO_BE_PURGED_JOBS_MAP
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 7eb0db9..915cba1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -19,7 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixManager;
@@ -29,9 +32,12 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
   private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -42,34 +48,89 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   }
 
   @Override
-  public void execute(ClusterEvent event) {
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+  public void process(ClusterEvent event) throws Exception {
+    // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
+    // This is to avoid race conditions since the cache will be modified. After this work, then the
+    // async work will happen.
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-
-    if (dataProvider == null || manager == null) {
+    if (manager == null) {
       LOG.warn(
-          "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Set<WorkflowConfig> existingWorkflows =
-        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
-    for (WorkflowConfig workflowConfig : existingWorkflows) {
-      // clean up the expired jobs if it is a queue.
+    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+    Set<String> workflowsToBePurged = new HashSet<>();
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
+      WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
-        try {
-          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
-              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
-              _rebalanceScheduler);
-        } catch (Exception e) {
-          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
-              workflowConfig.getWorkflowId(), e.toString()));
+        WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
+        if (workflowContext == null) {
+          continue;
         }
+        long purgeInterval = workflowConfig.getJobPurgeInterval();
+        long currentTime = System.currentTimeMillis();
+        long nextPurgeTime = workflowContext.getLastJobPurgeTime() + purgeInterval;
+        if (purgeInterval > 0 && nextPurgeTime <= currentTime) {
+          nextPurgeTime = currentTime + purgeInterval;
+          // Find jobs that are ready to be purged
+          Set<String> expiredJobs =
+              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
+          if (!expiredJobs.isEmpty()) {
+            expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+          }
+        }
+        scheduleNextJobPurge(workflowConfig.getWorkflowId(), nextPurgeTime, _rebalanceScheduler,
+            manager);
+      } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
+          .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
+        // Find workflows that need to be purged
+        workflowsToBePurged.add(entry.getKey());
       }
     }
+    event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
+        Collections.unmodifiableMap(expiredJobsMap));
+    event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
+        Collections.unmodifiableSet(workflowsToBePurged));
+
+    super.process(event);
+  }
 
+  @Override
+  public void execute(ClusterEvent event) {
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    if (manager == null) {
+      LOG.warn(
+          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
+          event.getEventId(), event.getEventType(), event.getClusterName());
+      return;
+    }
+
+    Map<String, Set<String>> expiredJobsMap =
+        event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
+    Set<String> toBePurgedWorkflows =
+        event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+
+    for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
+      try {
+        TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager, _rebalanceScheduler);
+      } catch (Exception e) {
+        LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
+      }
+    }
+
+    TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+  }
+
+  private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index e11f49e..e6e792f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,20 +735,8 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
-        if (jobConfig == null) {
-          LOG.error(String.format(
-              "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
-              job, workflowConfig.getWorkflowId()));
-          // Add the job name to expiredJobs so that purge operation will be tried again on this job
+        if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
           expiredJobs.add(job);
-          continue;
-        }
-        long expiry = jobConfig.getExpiry();
-        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
-          if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
-              && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
-            expiredJobs.add(job);
-          }
         }
       }
     }
@@ -756,6 +744,52 @@ public class TaskUtil {
   }
 
   /**
+   * Based on a workflow's config or context, create a set of jobs that are either expired, which
+   * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
+   * meaning that the job might have been deleted manually from the a job queue, or is left in the
+   * DAG due to a failed clean-up attempt from last purge. The difference between this function and
+   * getExpiredJobs() is that this function gets JobConfig and JobContext from a
+   * WorkflowControllerDataProvider instead of Zk.
+   * @param workflowControllerDataProvider
+   * @param workflowConfig
+   * @param workflowContext
+   * @return
+   */
+  public static Set<String> getExpiredJobsFromCache(
+      WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    Set<String> expiredJobs = new HashSet<>();
+    Map<String, TaskState> jobStates = workflowContext.getJobStates();
+    for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+      JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
+      if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+        expiredJobs.add(job);
+      }
+    }
+    return expiredJobs;
+  }
+
+  /*
+   * Checks if a job is expired and should be purged. This includes a special case when jobConfig
+   * is null. That happens when a job might have been deleted manually from the a job queue, or is
+   * left in the DAG due to a failed clean-up attempt from last purge.
+   */
+  private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
+      TaskState jobState) {
+    if (jobConfig == null) {
+      LOG.warn(
+          "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
+          jobName);
+      return true;
+    }
+    long expiry = jobConfig.getExpiry();
+    return jobContext != null && jobState == TaskState.COMPLETED
+        && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+        && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+  }
+
+  /**
    * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
    * @param accessor
    * @param propertyStore
@@ -977,72 +1011,71 @@ public class TaskUtil {
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-      expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
+          workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+      // concurrent write issue. It is possible that jobs got purged but there is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
+            expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
   }
 
-  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
-      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+  /**
+   * The function that removes IdealStates and workflow contexts of the workflows that need to be
+   * deleted.
+   * @param toBePurgedWorkflows
+   * @param manager
+   */
+  public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
+      final HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
+
+    for (String workflowName : toBePurgedWorkflows) {
+      LOG.warn(
+          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
+          workflowName);
+
+      // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
+      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+        LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
+            workflowName);
+        continue;
+      }
+
+      if (!removeWorkflowContext(propertyStore, workflowName)) {
+        LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
+      }
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 53be558..4c9bd18 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,20 +356,6 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Set the job configuration
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    HelixProperty resourceConfig = new HelixProperty(jobResource);
-    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null) {
-      for (TaskConfig taskConfig : taskConfigMap.values()) {
-        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
-      }
-    }
-    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
     // Push out new ideal state based on number of target partitions
     IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index 99e227c..43fda00 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -19,9 +19,17 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -85,7 +93,7 @@ public class TestTaskStage extends TaskTestBase {
         TaskConstants.STATE_MODEL_NAME);
 
     // Create the context
-    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
     wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
     wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
     wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -144,15 +152,34 @@ public class TestTaskStage extends TaskTestBase {
    * async job purge will try to delete it again.
    */
   @Test(dependsOnMethods = "testPersistContextData")
-  public void testPartialDataPurge() {
+  public void testPartialDataPurge() throws Exception {
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+            AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
     // Manually delete JobConfig
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
 
+    // Manually refresh because there's no controller notify data change
+    BaseControllerDataProvider dataProvider =
+        _event.getAttribute(AttributeName.ControllerDataProvider.name());
+    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    dataProvider.refresh(_manager.getHelixDataAccessor());
+
     // Then purge jobs
     TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.execute(_event);
+    garbageCollectionStage.process(_event);
 
     // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
     // IdealState check
@@ -161,6 +188,41 @@ public class TestTaskStage extends TaskTestBase {
     checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
   }
 
+  @Test(dependsOnMethods = "testPartialDataPurge")
+  public void testWorkflowGarbageCollection() throws Exception {
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+            AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
+    String zkPath =
+        _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
+    _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
+
+    // Manually refresh because there's no controller notify data change
+    BaseControllerDataProvider dataProvider =
+        _event.getAttribute(AttributeName.ControllerDataProvider.name());
+    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    dataProvider.refresh(_manager.getHelixDataAccessor());
+
+    // Then garbage collect workflow
+    TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
+    garbageCollectionStage.process(_event);
+
+    // Check that IS and contexts have been purged for the workflow
+    checkForIdealStateAndContextRemoval(_testWorkflow);
+
+    worker.shutdown();
+  }
+
   private void deleteJobConfigs(String workflowName, String jobName) {
     String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
     String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -169,16 +231,23 @@ public class TestTaskStage extends TaskTestBase {
     _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
   }
 
-  private void checkForIdealStateAndContextRemoval(String workflow, String job) {
-    // IdealState
-    Assert.assertFalse(
-        _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
-
+  private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
     // JobContexts in old ZNode path
     String oldPath =
         String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
     String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
-    Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+
+    Assert.assertTrue(TestHelper.verify(
+        () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
+            && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
+            .exists(newPath, AccessOption.PERSISTENT), 120000));
+  }
+
+  private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
+    Assert.assertTrue(TestHelper.verify(() ->
+            !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
+                && !_baseAccessor
+                .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
+        120000));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index daa9b4b..ba4fb44 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -80,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
   }
 
   @Test
-  public void testJobQueueAutoCleanUp() throws InterruptedException {
+  public void testJobQueueAutoCleanUp() throws Exception {
     int capacity = 10;
     String queueName = TestHelper.getTestMethodName();
     JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
@@ -105,14 +105,19 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     }
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
-    Thread.sleep(2000);
 
-    WorkflowConfig config = _driver.getWorkflowConfig(queueName);
-    Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+    Assert
+        .assertTrue(TestHelper.verify(() -> {
+          WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+          System.out.println("|Current time: " + System.currentTimeMillis() +" **TEST: " + config.getJobDag().getAllNodes());
+          return config.getJobDag().getAllNodes().equals(remainJobs);
+        }, TestHelper.WAIT_DURATION));
 
-    WorkflowContext context = _driver.getWorkflowContext(queueName);
-    Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
-    Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext context = _driver.getWorkflowContext(queueName);
+      return context.getJobStates().keySet().equals(remainJobs) && remainJobs
+          .containsAll(context.getJobStartTimes().keySet());
+    }, TestHelper.WAIT_DURATION));
 
     for (String job : deletedJobs) {
       JobConfig cfg = _driver.getJobConfig(job);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 70dd33f..31010cf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,6 +103,72 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
     Assert.assertTrue(workflowContextNotCreated);
   }
 
+  @Test
+  public void testWorkflowContextGarbageCollection() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+    _driver.start(builder1.build());
+
+    // Wait until workflow is created and IN_PROGRESS state.
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
+    // workflow
+    Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+    Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+    Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
+
+    String workflowContextPath =
+        "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
+
+    ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // Wait until workflow is completed.
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
+    // expired.
+    boolean workflowExpired = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+      IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
+      return (wCtx == null && wCfg == null && idealState == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(workflowExpired);
+
+    _controller.syncStop();
+
+    // Write workflow context to ZooKeeper
+    _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
+        AccessOption.PERSISTENT);
+
+    // Verify context is written back to ZK.
+    record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Create and start new workflow just to make sure controller is running and new workflow is
+    // scheduled successfully.
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+    _driver.start(builder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    // Verify that WorkflowContext will be deleted
+    boolean contextDeleted = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      return (wCtx == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(contextDeleted);
+  }
+
   private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
     final long expiryTime = 5000L;
     Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..56e756d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTaskUtil extends TaskTestBase {
+
+  @Test
+  public void testGetExpiredJobsFromCache() {
+    String workflowName = "TEST_WORKFLOW";
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
+
+    JobConfig.Builder jobBuilder_0 =
+        new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_1 =
+        new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_2 =
+        new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_3 =
+        new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    Workflow jobQueue =
+        queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
+            .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    Map<String, TaskState> jobStates = new HashMap<>();
+    jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
+    jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
+    jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
+    when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+    JobConfig jobConfig = mock(JobConfig.class);
+    WorkflowControllerDataProvider workflowControllerDataProvider =
+        mock(WorkflowControllerDataProvider.class);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
+        .thenReturn(jobConfig);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
+        .thenReturn(jobConfig);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
+        .thenReturn(jobConfig);
+
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
+
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+        .thenReturn(jobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+        .thenReturn(jobContext);
+
+    Set<String> expectedJobs = new HashSet<>();
+    expectedJobs.add(workflowName + "_Job_0");
+    expectedJobs.add(workflowName + "_Job_3");
+    Assert.assertEquals(TaskUtil
+        .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
+            workflowContext), expectedJobs);
+  }
+}


[helix] 05/10: Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cb8c696e3e7e1b6a984a4eda0e789f5518591f06
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jun 10 11:26:09 2020 -0700

    Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)
    
    Remove previousAssignment from FixedTargetTaskAssignmentCalculator
    
    The FixedTargetTaskAssignmentCalculator class is relying on the previousAssignment.
    In this commit, this class has been modified and previousAssignment has been
    replaced with the context information.
    Also several legacy job and workflow rebalancers have been removed.
---
 .../java/org/apache/helix/model/IdealState.java    |   18 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  |   15 +-
 .../helix/task/DeprecatedTaskRebalancer.java       | 1149 --------------------
 .../task/FixedTargetTaskAssignmentCalculator.java  |   47 +-
 .../helix/task/FixedTargetTaskRebalancer.java      |   60 -
 .../task/GenericTaskAssignmentCalculator.java      |   12 +-
 .../apache/helix/task/GenericTaskRebalancer.java   |   57 -
 .../java/org/apache/helix/task/JobDispatcher.java  |   18 +-
 .../helix/task/TaskAssignmentCalculator.java       |   18 +
 .../ThreadCountBasedTaskAssignmentCalculator.java  |   10 +
 .../TestFixedTargetedTaskAssignmentCalculator.java |  288 +++++
 11 files changed, 386 insertions(+), 1306 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 3274542..aafcca8 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -20,7 +20,9 @@ package org.apache.helix.model;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,8 +33,6 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.model.ResourceConfig.ResourceConfigProperty;
-import org.apache.helix.task.FixedTargetTaskRebalancer;
-import org.apache.helix.task.GenericTaskRebalancer;
 import org.apache.helix.task.JobRebalancer;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.task.WorkflowRebalancer;
@@ -74,6 +74,9 @@ public class IdealState extends HelixProperty {
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
+  public static final Set<String> LEGACY_TASK_REBALANCERS =
+      new HashSet<>(Arrays.asList("org.apache.helix.task.GenericTaskRebalancer",
+          "org.apache.helix.task.FixedTargetTaskRebalancer"));
 
   /**
    * Deprecated, use ResourceConfig.ResourceConfigConstants instead
@@ -721,11 +724,16 @@ public class IdealState extends HelixProperty {
       String rebalancerName = getRebalancerClassName();
       if (rebalancerName != null) {
         if (rebalancerName.equals(JobRebalancer.class.getName())
-            || rebalancerName.equals(WorkflowRebalancer.class.getName())
-            || rebalancerName.equals(GenericTaskRebalancer.class.getName())
-            || rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())) {
+            || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
           property = RebalanceMode.TASK;
         } else {
+          if (LEGACY_TASK_REBALANCERS.contains(rebalancerName)) {
+            // Print a warning message if legacy TASK rebalancer is used
+            // Since legacy rebalancers have been removed, it is not safe just running legacy jobs and jobs/workflows
+            //with current task assignment strategy.
+            logger.warn("The rebalancer {} is not supported anymore. Setting rebalance mode to USER_DEFINED.",
+                rebalancerName);
+          }
           property = RebalanceMode.USER_DEFINED;
         }
       } else {
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index fa12203..ffbdcef 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -539,11 +539,10 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
-      String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
-      final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
-      final WorkflowControllerDataProvider cache,
-      ResourceAssignment prevTaskToInstanceStateAssignment,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
+      JobContext jobCtx, final JobConfig jobCfg, final WorkflowConfig workflowConfig,
+      WorkflowContext workflowCtx, final WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap,
       Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
       Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
@@ -599,9 +598,9 @@ public abstract class AbstractTaskDispatcher {
 
     // The actual assignment is computed here
     // Get instance->[partition, ...] mappings for the target resource.
-    Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal.getTaskAssignment(
-        currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, jobCfg, jobCtx,
-        workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
+    Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+        taskAssignmentCal.getTaskAssignment(currStateOutput, liveInstances, jobCfg, jobCtx,
+            workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
 
     if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) {
       // TODO: Revisit the logic for isRebalanceRunningTask() and valid use cases for it
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
deleted file mode 100644
index 5202d41..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ /dev/null
@@ -1,1149 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Custom rebalancer implementation for the {@code Task} state model.
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public abstract class DeprecatedTaskRebalancer
-    implements Rebalancer<WorkflowControllerDataProvider>,
-    MappingCalculator<WorkflowControllerDataProvider> {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class);
-
-  // Management of already-scheduled rebalances across jobs
-  private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
-  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
-      .newSingleThreadScheduledExecutor();
-  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
-  // For connection management
-  private HelixManager _manager;
-
-  /**
-   * Get all the partitions that should be created by this task
-   * @param jobCfg the task configuration
-   * @param jobCtx the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
-   * @return set of partition numbers
-   */
-  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache);
-
-  /**
-   * Compute an assignment of tasks to instances
-   * @param currStateOutput the current state of the instances
-   * @param prevAssignment the previous task partition assignment
-   * @param instances the instances
-   * @param jobCfg the task configuration
-   * @param jobContext the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
-   * @return map of instances to set of partition numbers
-   */
-  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      WorkflowControllerDataProvider cache);
-
-  @Override
-  public void init(HelixManager manager) {
-    _manager = manager;
-  }
-
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(WorkflowControllerDataProvider clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
-    final String resourceName = resource.getResourceName();
-    LOG.debug("Computer Best Partition for resource: " + resourceName);
-
-    // Fetch job configuration
-    JobConfig jobCfg = (JobConfig) clusterData.getResourceConfig(resourceName);
-    if (jobCfg == null) {
-      LOG.debug("Job configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflowResource);
-    if (workflowCfg == null) {
-      LOG.debug("Workflow configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflowResource);
-      LOG.info("Workflow context for " + resourceName + " created!");
-    }
-
-    // check ancestor job status
-    int notStartedCount = 0;
-    int inCompleteCount = 0;
-    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
-      TaskState jobState = workflowCtx.getJobState(ancestor);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
-        ++inCompleteCount;
-      }
-    }
-
-    if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >= workflowCfg
-        .getParallelJobs())) {
-      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info(
-          "Workflow is marked as deleted " + workflowResource
-              + " cleaning up the workflow context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Workflow " + workflowResource
-          + " is completed and passed expiry time, cleaning up the workflow context.");
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-
-    JobContext jobCtx = clusterData.getJobContext(resourceName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
-      jobCtx.setStartTime(System.currentTimeMillis());
-      jobCtx.setName(resourceName);
-    }
-
-    // Check for expired jobs for non-terminable workflows
-    long jobFinishTime = jobCtx.getFinishTime();
-    if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
-        && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Job " + resourceName
-          + " is completed and passed expiry time, cleaning up the job context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // The job is already in a final state (completed/failed).
-    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
-        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
-      LOG.debug("Job " + resourceName + " is failed or already completed.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
-    if (!isReady) {
-      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
-            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
-            clusterData);
-
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(resourceName, jobCtx);
-    clusterData
-        .updateWorkflowContext(workflowResource, workflowCtx);
-
-    setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    LOG.debug("Job " + resourceName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
-
-    return newAssignment;
-  }
-
-  /**
-   * Get the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is available
-   */
-  private ResourceAssignment getPrevResourceAssignment(HelixManager manager,
-      String resourceName) {
-    ZNRecord r =
-        manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  public void setPrevResourceAssignment(HelixManager manager, String resourceName,
-      ResourceAssignment ra) {
-    manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
-      WorkflowConfig workflowCfg, WorkflowControllerDataProvider cache) {
-
-    Set<String> ret = new HashSet<String>();
-
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      if (jobName.equals(currentJobName)) {
-        continue;
-      }
-
-      JobContext jobContext = cache.getJobContext(jobName);
-      if (jobContext == null) {
-        continue;
-      }
-      for (int partition : jobContext.getPartitionSet()) {
-        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
-        if (partitionState == TaskPartitionState.INIT ||
-            partitionState == TaskPartitionState.RUNNING) {
-          ret.add(jobContext.getAssignedParticipant(partition));
-        }
-      }
-    }
-
-    return ret;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
-      WorkflowControllerDataProvider cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-      // Workflow has been stopped if all jobs are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of tasks that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Used to keep track of tasks that have failed, but whose failure is acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    Set<String> excludedInstances =
-        getInstancesAssignedToOtherJobs(jobResource, workflowConfig, cache);
-
-    // Process all the current assignments of tasks.
-    Set<Integer> allPartitions =
-        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    long currentTime = System.currentTimeMillis();
-    for (String instance : taskAssignments.keySet()) {
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-          if (stateMap != null) {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format(
-                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                      pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition %s.", instance,
-              requestedState, pName));
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-              nextState, instance));
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          LOG.debug(String
-              .format(
-                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                  pName, currState));
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in rebalancer context.",
-              pName, currState));
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts.
-          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
-            // If the user does not require this task to succeed in order for the job to succeed,
-            // then we don't have to fail the job right now
-            boolean successOptional = false;
-            String taskId = jobCtx.getTaskIdForPartition(pId);
-            if (taskId != null) {
-              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-              if (taskConfig != null) {
-                successOptional = taskConfig.isSuccessOptional();
-              }
-            }
-
-            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
-            // to fail the job immediately
-            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
-              successOptional = true;
-            }
-
-            if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
-              markAllPartitionsError(jobCtx, currState, false);
-              addAllPartitions(allPartitions, partitionsToDropFromIs);
-              return emptyAssignment(jobResource, currStateOutput);
-            } else {
-              skippedPartitions.add(pId);
-              partitionsToDropFromIs.add(pId);
-            }
-          } else {
-            // Mark the task to be started at some later time (if enabled)
-            markPartitionDelayed(jobCfg, jobCtx, pId);
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          LOG.debug(String.format(
-              "Task partition %s has state %s. It will be dropped from the current ideal state.",
-              pName, currState));
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-    }
-
-    // Make additional task assignments if needed.
-    if (jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
-          continue;
-        }
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-          ImmutableMap.of(pa._instance, pa._state));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @param workflowResource the Helix resource associated with the workflow
-   * @param jobResource a job from the workflow
-   * @param cache the current snapshot of the cluster
-   * @return true if ready, false if not ready
-   */
-  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      String workflowResource, String jobResource, WorkflowControllerDataProvider cache) {
-    // Ignore non-scheduled workflows
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Remove any timers that are past-time for this workflow
-      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
-      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          LOG.debug(
-              "Skip scheduling since the workflow has not been started " + workflowResource);
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
-        // Now clone the workflow if this clone has not yet been created
-        String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
-        LOG.debug("Ready to start workflow " + newWorkflowName);
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          try {
-            // Start the cloned workflow
-            driver.start(clonedWf);
-          } catch (Exception e) {
-            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-          }
-          // Persist workflow start regardless of success to avoid retrying and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          cache.updateWorkflowContext(workflowResource, workflowCtx);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        startTime = new Date(timeToSchedule + period);
-        delayFromStart = startTime.getTime() - System.currentTimeMillis();
-      } else {
-        // This is a one-time workflow and is ready
-        return true;
-      }
-    }
-
-    scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
-    return false;
-  }
-
-  /**
-   * Create a new workflow based on an existing one
-   * @param manager connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired start time
-   * @return the cloned workflow, or null if there was a problem cloning the existing one
-   */
-  private Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
-
-    // Create a new workflow with a new name
-    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
-    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag =
-        JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
-
-    // Set the workflow expiry
-    builder.setExpiry(
-        Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
-
-    // Set the schedule, if applicable
-    ScheduleConfig scheduleConfig;
-    if (newStartTime != null) {
-      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-    } else {
-      scheduleConfig = WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
-    }
-    if (scheduleConfig != null) {
-      builder.setScheduleConfig(scheduleConfig);
-    }
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), newWorkflowName); // overwrite workflow name
-        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
-          builder.addConfig(job, e.getKey(), e.getValue());
-        }
-        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        builder.addTaskConfigs(job, taskConfigs);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
-            builder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return builder.build();
-  }
-
-  private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
-    // Do nothing if there is already a timer set for the this workflow with the same start time.
-    if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
-        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
-      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
-      return;
-    }
-    LOG.info(
-        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
-            + " delay from start: " + delayFromStart);
-
-    // For workflows not yet scheduled, schedule them and record it
-    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
-    SCHEDULED_TIMES.put(id, startTime);
-    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
-  }
-
-  private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
-    // Clear current entries if they exist and are expired
-    long currentTime = now;
-    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
-    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-      LOG.debug(
-          "Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-      SCHEDULED_TIMES.remove(jobResource);
-    }
-
-    // Figure out the earliest schedulable time in the future of a non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : ctx.getPartitionSet()) {
-      long retryTime = ctx.getNextRetryTime(p);
-      TaskPartitionState state = ctx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      long delay = earliestTime - currentTime;
-      Date startTime = new Date(earliestTime);
-      scheduleRebalance(jobResource, jobResource, startTime, delay);
-    }
-  }
-
-  /**
-   * Checks if the job has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @param skippedPartitions partitions that failed, but whose failure is acceptable
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
-      Set<Integer> skippedPartitions, JobConfig cfg) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
-          && !isTaskGivenup(ctx, cfg, pId)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has completed.
-   * @param ctx Workflow context containing job states
-   * @param cfg Workflow config containing set of jobs
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
-   */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    if (!cfg.isTerminable()) {
-      return false;
-    }
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has been stopped.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
-   */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.WorkflowConfigProperty.TargetState.name(), TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
-   * is the last remaining job in its workflow, and the workflow is terminable.
-   */
-  private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
-      String workflowResource) {
-    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-
-    // Remove any DAG references in workflow
-    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag
-            .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        for (String child : jobDag.getDirectChildren(resourceName)) {
-          jobDag.getChildrenToParents().get(child).remove(resourceName);
-        }
-        for (String parent : jobDag.getDirectParents(resourceName)) {
-          jobDag.getParentsToChildren().get(parent).remove(resourceName);
-        }
-        jobDag.getChildrenToParents().remove(resourceName);
-        jobDag.getParentsToChildren().remove(resourceName);
-        jobDag.getAllNodes().remove(resourceName);
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          LOG.equals("Could not update DAG for job: " + resourceName);
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          cfgKey));
-    }
-
-    // Delete property store information for this resource.
-    // For recurring workflow, it's OK if the node doesn't exist.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName, isKey));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    accessor.removeProperty(evKey);
-
-    LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this job
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
-        lastInWorkflow = false;
-        break;
-      }
-    }
-
-    // clean up workflow-level info if this was the last in workflow
-    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowPropStoreKey));
-      }
-      // Remove pending timer for this workflow if exists
-      if (SCHEDULED_TIMES.containsKey(workflowResource)) {
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
-    for (Integer pId : toAdd) {
-      destination.add(pId);
-    }
-  }
-
-  private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
-    ResourceAssignment assignment = new ResourceAssignment(name);
-    Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
-    for (Partition partition : partitions) {
-      Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
-      Map<String, String> replicaMap = Maps.newHashMap();
-      for (String instanceName : currentStateMap.keySet()) {
-        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
-      }
-      assignment.addReplicaMap(partition, replicaMap);
-    }
-    return assignment;
-  }
-
-  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
-    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-  }
-
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
-  }
-
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
-  }
-
-  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = pId(partition.getPartitionName());
-      if (includeSet.contains(pId)) {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
-      }
-    }
-    return nonReadyPartitions;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  protected static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  protected static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
-
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, WorkflowControllerDataProvider clusterData) {
-    // All of the heavy lifting is in the ResourceAssignment computation,
-    // so this part can just be a no-op.
-    return currentIdealState;
-  }
-
-  /**
-   * The simplest possible runnable that will trigger a run of the controller pipeline
-   */
-  private static class RebalanceInvoker implements Runnable {
-    private final HelixManager _manager;
-    private final String _resource;
-
-    public RebalanceInvoker(HelixManager manager, String resource) {
-      _manager = manager;
-      _resource = resource;
-    }
-
-    @Override
-    public void run() {
-      RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
-    }
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 5721921..8a29232 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -77,12 +77,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
-    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, instances,
-        workflowCfg, jobCfg, jobContext, partitionSet, idealStateMap);
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
+    return computeAssignmentAndChargeResource(currStateOutput, instances, workflowCfg, jobCfg,
+        jobContext, partitionSet, idealStateMap);
   }
 
   /**
@@ -175,7 +185,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    * Calculate the assignment for given tasks. This assignment also charges resources for each task
    * and takes resource/quota availability into account while assigning.
    * @param currStateOutput
-   * @param prevAssignment
    * @param liveInstances
    * @param jobCfg
    * @param jobContext
@@ -184,9 +193,9 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    * @return instance -> set of task partition numbers
    */
   private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, WorkflowConfig workflowCfg, JobConfig jobCfg,
-      JobContext jobContext, Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
+      CurrentStateOutput currStateOutput, Collection<String> liveInstances,
+      WorkflowConfig workflowCfg, JobConfig jobCfg, JobContext jobContext,
+      Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
 
     // Note: targeted jobs also take up capacity in quota-based scheduling
     // "Charge" resources for the tasks
@@ -250,23 +259,27 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
             // the new assignment differs from prevAssignment, release. If the assigned instances
             // from old and new assignments are the same, then do nothing and let it keep running
             // The following checks if two assignments (old and new) differ
-            Map<String, String> instanceMap = prevAssignment.getReplicaMap(new Partition(pName));
-            Iterator<String> itr = instanceMap.keySet().iterator();
+
             // First, check if this taskPartition has been ever assigned before by checking
-            // prevAssignment
-            if (itr.hasNext()) {
-              String prevInstance = itr.next();
-              if (!prevInstance.equals(instance)) {
-                // Old and new assignments are different. We need to release from prevInstance, and
-                // this task will be assigned to a different instance
+            // jobContext's AssignedParticipant field
+            String prevAssignedInstance = jobContext.getAssignedParticipant(targetPartitionId);
+            TaskPartitionState taskState = jobContext.getPartitionState(targetPartitionId);
+
+            if (prevAssignedInstance != null && taskState != null
+                && (taskState.equals(TaskPartitionState.INIT)
+                    || taskState.equals(TaskPartitionState.RUNNING))) {
+              // If the task is in active state and old and new assignments are different, we need
+              // to release from prevInstance, and this task will be assigned to a different
+              // instance
+              if (!prevAssignedInstance.equals(instance)) {
                 if (_assignableInstanceManager.getAssignableInstanceNames()
-                    .contains(prevInstance)) {
-                  _assignableInstanceManager.release(prevInstance, taskConfig, quotaType);
+                    .contains(prevAssignedInstance)) {
+                  _assignableInstanceManager.release(prevAssignedInstance, taskConfig, quotaType);
                 } else {
                   // This instance must be no longer live
                   LOG.warn(
                       "Task {} was reassigned from old instance: {} to new instance: {}. However, old instance: {} is not found in AssignableInstanceMap. The old instance is possibly no longer a LiveInstance. This task will not be released.",
-                      pName, prevAssignment, instance);
+                      pName, prevAssignedInstance, instance, prevAssignedInstance);
                 }
               } else {
                 // Old and new assignments are the same, so just skip assignment for this
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
deleted file mode 100644
index 49cd4d6..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-/**
- * A rebalancer for when a task group must be assigned according to partitions/states on a target
- * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
- * (if desired) only where those partitions are in a given state.
- */
-
-/**
- * This rebalancer is deprecated, left here only for back-compatible. *
- */
-@Deprecated
-public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer {
-  private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator =
-      new FixedTargetTaskAssignmentCalculator();
-
-  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index c8989f8..10f2d82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -64,10 +64,20 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, final JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
deleted file mode 100644
index ab290d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-
-
-/**
- * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
- * assignment to target partitions and states of another resource
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
-  private GenericTaskAssignmentCalculator taskAssignmentCalculator =
-      new GenericTaskAssignmentCalculator();
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 10a1b7c..191a2ea 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -185,8 +185,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     Set<Integer> partitionsToDrop = new TreeSet<>();
     ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, prevAssignment,
-            liveInstances, currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
+        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, liveInstances,
+            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
 
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
@@ -200,9 +200,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   private ResourceAssignment computeResourceMapping(String jobResource,
       WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
-      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> liveInstances,
-      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx,
-      Set<Integer> partitionsToDropFromIs, WorkflowControllerDataProvider cache) {
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      WorkflowControllerDataProvider cache) {
 
     // Used to keep track of tasks that have already been assigned to instances.
     // InstanceName -> Set of task partitions assigned to that instance in this iteration
@@ -316,10 +316,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances, jobResource,
-          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
-          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
-          taskAssignmentCal, allPartitions, currentTime, liveInstances);
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
+          jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
+          assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, allPartitions,
+          currentTime, liveInstances);
     }
 
     return toResourceAssignment(jobResource, paMap);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 8cbea97..793ba6c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -56,6 +56,7 @@ public abstract class TaskAssignmentCalculator {
    * @param idealStateMap the map of resource name map to ideal state
    * @return map of instances to set of partition numbers
    */
+  @Deprecated
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
@@ -63,6 +64,23 @@ public abstract class TaskAssignmentCalculator {
       Map<String, IdealState> idealStateMap);
 
   /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param instances the instances
+   * @param jobCfg the task configuration
+   * @param jobContext the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param idealStateMap the map of resource name map to ideal state
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, Collection<String> instances, JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap);
+
+  /**
    * Returns the correct type for this job. Note that if the parent workflow has a type, then all of
    * its jobs will inherit the type from the workflow.
    * @param workflowConfig
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index 129618b..ad66be0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -75,10 +75,20 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
new file mode 100644
index 0000000..10283d0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
@@ -0,0 +1,288 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.SortedSet;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This unit test makes sure that FixedTargetedTaskAssignmentCalculator makes correct decision for
+ * targeted jobs
+ */
+public class TestFixedTargetedTaskAssignmentCalculator {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_PARTITION_NAME = "0";
+  private static final int PARTITION_ID = 0;
+  private static final String TARGET_RESOURCES = "TestDB";
+
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition is still on the same instance and in RUNNING state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceRunningTask() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition is still on the same instance and in INIT state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceInitTask() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.INIT,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.INIT, masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition has moved to another instance, controller assign the task to
+   * new/correct instance.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorDifferentInstance() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 2;
+    String slaveInstance1 = INSTANCE_PREFIX + 1;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, slaveInstance1);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(masterInstance).size(),1);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setJobId(JOB_NAME);
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    List<String> targetPartition = new ArrayList<>();
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>(Collections.singletonList("MASTER"));
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    return jobConfigBuilder.build();
+  }
+
+  private JobContext prepareJobContext(TaskPartitionState taskPartitionState, String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(PARTITION_ID, taskPartitionState);
+    jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_" + TARGET_PARTITION_NAME);
+    jobContext.setAssignedParticipant(PARTITION_ID, instance);
+    return jobContext;
+  }
+
+  private CurrentStateOutput prepareCurrentState(TaskPartitionState taskCurrentState,
+      String masterInstance, String slaveInstance1, String slaveInstance2) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance,
+        taskCurrentState.name());
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance1, "SLAVE");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance2, "SLAVE");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    Map<String, IdealState> idealStates = new HashMap<>();
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(),
+        "AUTO_REBALANCE");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "MasterSlave");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+}


[helix] 03/10: Make the task scheduling decision independent of the PreviousAssignment (#994)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ce100d2230c2f136092efb0d2ca7ef6d7a697326
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue May 26 16:49:42 2020 -0700

    Make the task scheduling decision independent of the PreviousAssignment (#994)
    
    In this commit, the previous scheduling logic which was based on PreviousAssignment,
    has been changed and will no longer depend on prevAssignment. Instead, the task scheduling will be based solely on the CurrentState.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 100 ++++++++++++++-------
 .../java/org/apache/helix/task/JobDispatcher.java  |  74 +++++----------
 .../helix/task/TestDropTerminalTasksUponReset.java |   3 +-
 3 files changed, 90 insertions(+), 87 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 60c2402..8934337 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -66,7 +66,7 @@ public abstract class AbstractTaskDispatcher {
   // Job Update related methods
 
   public void updatePreviousAssignedTasksStatus(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg,
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
@@ -78,11 +78,11 @@ public abstract class AbstractTaskDispatcher {
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
     // Iterate through all instances
-    for (String instance : prevInstanceToTaskAssignments.keySet()) {
+    for (String instance : currentInstanceToTaskAssignments.keySet()) {
       assignedPartitions.put(instance, new HashSet<>());
 
       // Set all dropping transitions first. These are tasks coming from Participant disconnects
-      // that have some active current state (INIT or RUNNING) and the requestedState of DROPPED.
+      // and have the requestedState of DROPPED.
       // These need to be prioritized over any other state transitions because of the race condition
       // with the same pId (task) running on other instances. This is because in paMap, we can only
       // define one transition per pId
@@ -99,7 +99,7 @@ public abstract class AbstractTaskDispatcher {
       }
 
       // If not an excluded instance, we must instantiate its entry in assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already made an assignment in
       // paMap above for them to be dropped. The following does this.
@@ -107,8 +107,7 @@ public abstract class AbstractTaskDispatcher {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in either INIT or DROPPED states
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
@@ -121,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
               instance, pId);
           continue;
         }
-        // This avoids a race condition in the case that although currentState is in the following
-        // error condition, the pending message (INIT->RUNNNING) might still be present.
-        // This is undesirable because this prevents JobContext from getting the proper update of
-        // fields including task state and task's NUM_ATTEMPTS
-        if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR
-            || currState == TaskPartitionState.TIMED_OUT
-            || currState == TaskPartitionState.TASK_ABORTED) {
-          // Do not increment the task attempt count here - it will be incremented at scheduling
-          // time
-          markPartitionError(jobCtx, pId, currState);
-        }
 
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
@@ -242,16 +230,16 @@ public abstract class AbstractTaskDispatcher {
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
                 pName, currState));
           }
           partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-
           // This task is COMPLETED, so release this task
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
@@ -263,7 +251,11 @@ public abstract class AbstractTaskDispatcher {
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the task will be retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
@@ -389,13 +381,59 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }
+      if (currentState == TaskPartitionState.COMPLETED) {
+        markPartitionCompleted(jobCtx, pId);
+      }
+      // This avoids a race condition in the case that although currentState is in the following
+      // error condition, the pending message (INIT->RUNNNING) might still be present.
+      // This is undesirable because this prevents JobContext from getting the proper update of
+      // fields including task state and task's NUM_ATTEMPTS
+      if (currentState == TaskPartitionState.ERROR || currentState == TaskPartitionState.TASK_ERROR
+          || currentState == TaskPartitionState.TIMED_OUT
+          || currentState == TaskPartitionState.TASK_ABORTED) {
+        // Do not increment the task attempt count here - it will be incremented at scheduling
+        // time
+        markPartitionError(jobCtx, pId, currentState);
+      }
     }
-    return currentState;
   }
 
   /**
@@ -511,7 +549,7 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
       final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
       final WorkflowControllerDataProvider cache,
@@ -580,7 +618,7 @@ public abstract class AbstractTaskDispatcher {
       // TODO: isRebalanceRunningTask() was originally put in place to allow users to move
       // ("rebalance") long-running tasks, but there hasn't been a clear use case for this
       // Previously, there was a bug in the condition above (it was || where it should have been &&)
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
 
@@ -588,11 +626,11 @@ public abstract class AbstractTaskDispatcher {
     if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
       // Drop current jobs only if they are assigned to a different instance, regardless of
       // the jobCfg.isRebalanceRunningTask() setting
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
     // Go through ALL instances and assign/throttle tasks accordingly
-    for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet()) {
+    for (Map.Entry<String, SortedSet<Integer>> entry : currentInstanceToTaskAssignments.entrySet()) {
       String instance = entry.getKey();
       if (!tgtPartitionAssignments.containsKey(instance)) {
         // There is no assignment made for this instance, so it is safe to skip
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c14cee9..b35252c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -238,21 +238,20 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // These dropping transitions will be prioritized above all task state transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
         assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache,
         tasksToDrop);
@@ -318,7 +317,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances, jobResource,
           currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
           prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
           taskAssignmentCal, allPartitions, currentTime, liveInstances);
@@ -380,45 +379,24 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds to result and update their states in JobContext in
+    // updatePreviousAssignedTasksStatus method.
     Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
@@ -426,26 +404,14 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
-        TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here because old
-          // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to tasksToDrop no matter
           // what its current state is so that it will be dropped
+          // This is trying to drop tasks on a reconnected instance with a new sessionId that have
+          // all of their requestedState == DROPPED
           if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) {
             if (!tasksToDrop.containsKey(instance)) {
               tasksToDrop.put(instance, new HashSet<>());
@@ -462,10 +428,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
    * deleted) it is added from context. Otherwise, the context won't be updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
    */
   protected void updateInstanceToTaskAssignmentsFromContext(JobContext jobCtx,
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments) {
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments) {
     for (Integer partition : jobCtx.getPartitionSet()) {
       // We must add all active task pIds back here
       // The states other than Running and Init do not need to be added.
@@ -474,9 +440,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           || jobCtx.getPartitionState(partition) == TaskPartitionState.INIT) {
         String instance = jobCtx.getAssignedParticipant(partition);
         if (instance != null) {
-          if (prevInstanceToTaskAssignments.containsKey(instance)
-              && !prevInstanceToTaskAssignments.get(instance).contains(partition)) {
-            prevInstanceToTaskAssignments.get(instance).add(partition);
+          if (currentInstanceToTaskAssignments.containsKey(instance)
+              && !currentInstanceToTaskAssignments.get(instance).contains(partition)) {
+            currentInstanceToTaskAssignments.get(instance).add(partition);
           }
         }
       }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
index ca07e97..58dc2d1 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
@@ -90,8 +90,7 @@ public class TestDropTerminalTasksUponReset {
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
     // Call the static method we are testing
-    JobDispatcher.getPrevInstanceToTaskAssignments(liveInstances, prevAssignment, allTaskPartitions,
-        currentStateOutput, jobName, tasksToDrop);
+    JobDispatcher.getCurrentInstanceToTaskAssignments(liveInstances, currentStateOutput, jobName, tasksToDrop);
 
     // Check that tasksToDrop has (numTasks / 2) partitions as we intended regardless of what the
     // current states of the tasks were


[helix] 10/10: Change the logs to address new changes

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5f6d1eb0efdbd4bb750e0d32863406ca065b1648
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Aug 4 09:42:33 2020 -0700

    Change the logs to address new changes
    
    In this commit, some of the inaccurate logs have been modified.
---
 .../src/main/java/org/apache/helix/task/AssignableInstanceManager.java  | 2 +-
 helix-core/src/main/java/org/apache/helix/task/TaskUtil.java            | 2 +-
 .../java/org/apache/helix/integration/task/TestJobQueueCleanUp.java     | 1 -
 3 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index cca9335..e42659c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -267,7 +267,7 @@ public class AssignableInstanceManager {
       }
     }
     LOG.info(
-        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
+        "AssignableInstanceManager built AssignableInstances from scratch based on CurrentState.");
     computeGlobalThreadBasedCapacity();
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index e6e792f..1edc8a6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -1063,7 +1063,7 @@ public class TaskUtil {
 
     for (String workflowName : toBePurgedWorkflows) {
       LOG.warn(
-          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
+          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowContext and IdealState!!",
           workflowName);
 
       // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index ba4fb44..cfa341a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -109,7 +109,6 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     Assert
         .assertTrue(TestHelper.verify(() -> {
           WorkflowConfig config = _driver.getWorkflowConfig(queueName);
-          System.out.println("|Current time: " + System.currentTimeMillis() +" **TEST: " + config.getJobDag().getAllNodes());
           return config.getJobDag().getAllNodes().equals(remainJobs);
         }, TestHelper.WAIT_DURATION));
 


[helix] 02/10: Stabilize TestWorkflowTimeout and TestTaskRebalancer (#991)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f4cfbc74d8cc3a1ca03959c1301233bc014b7452
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Mon May 4 14:30:43 2020 -0700

    Stabilize TestWorkflowTimeout and TestTaskRebalancer (#991)
    
    In this commit, two tests have been stabilized.
    These tests are TestWorkflowTimeout and TestTaskRebalancer.
    These tests are identified as unstable because they are relying on Thread.Sleep()
---
 .../helix/integration/task/TestTaskRebalancer.java      | 14 +++++++++-----
 .../helix/integration/task/TestWorkflowTimeout.java     | 17 +++++++++++------
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 8a3c252..a09710a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -59,7 +59,7 @@ public class TestTaskRebalancer extends TaskTestBase {
   @Test
   public void testExpiry() throws Exception {
     String jobName = "Expiry";
-    long expiry = 1000;
+    long expiry = 1000L;
     Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(100));
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(commandConfig);
@@ -83,12 +83,16 @@ public class TestTaskRebalancer extends TaskTestBase {
 
     // Wait for job to finish and expire
     _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
-    Thread.sleep(expiry + 100);
+    long finishTime = _driver.getWorkflowContext(jobName).getFinishTime();
 
     // Ensure workflow config and context were cleaned up by now
-    Assert.assertFalse(
-        _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
-    Assert.assertNull(accessor.getProperty(workflowCfgKey));
+    Assert.assertTrue(TestHelper.verify(
+        () -> (!_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+            AccessOption.PERSISTENT) && accessor.getProperty(workflowCfgKey) == null),
+        TestHelper.WAIT_DURATION));
+
+    long cleanUpTime = System.currentTimeMillis();
+    Assert.assertTrue(cleanUpTime - finishTime >= expiry);
   }
 
   private void basic(long jobCompletionTime) throws Exception {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
index 4cf07db..c79913f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
@@ -102,18 +102,23 @@ public class TestWorkflowTimeout extends TaskTestBase {
   }
 
   @Test
-  public void testWorkflowTimeoutWhenWorkflowCompleted() throws InterruptedException {
+  public void testWorkflowTimeoutWhenWorkflowCompleted() throws Exception {
     String workflowName = TestHelper.getTestMethodName();
+    long expiry = 2000L;
     _jobBuilder.setWorkflow(workflowName);
     _jobBuilder.setJobCommandConfigMap(Collections.<String, String> emptyMap());
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(0).build())
-        .addJob(JOB_NAME, _jobBuilder).setExpiry(2000L);
+        .addJob(JOB_NAME, _jobBuilder).setExpiry(expiry);
 
+    // Since workflow's Timeout is 0, the workflow goes to TIMED_OUT state right away
+    long startTime = System.currentTimeMillis();
     _driver.start(workflowBuilder.build());
-    // Pause the queue
-    Thread.sleep(2500);
-    Assert.assertNull(_driver.getWorkflowConfig(workflowName));
-    Assert.assertNull(_driver.getJobContext(workflowName));
+
+    Assert.assertTrue(TestHelper.verify(() -> (_driver.getWorkflowConfig(workflowName) == null
+        && _driver.getWorkflowContext(workflowName) == null), TestHelper.WAIT_DURATION));
+
+    long cleanUpTime = System.currentTimeMillis();
+    Assert.assertTrue(cleanUpTime - startTime >= expiry);
   }
 }


[helix] 08/10: Quota calculation based on CurrentState (#1165)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 82c4640d0c2ed79ae69804a28cbaeb1e086d4342
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Jul 24 11:12:28 2020 -0700

    Quota calculation based on CurrentState (#1165)
    
    Calculate quota based on CurrentState
    
    In this commit, the new methods have been added to AssignableInstanceManager
    which allow controller to calculate quota based on CurrentState and pending
    messages.
---
 .../WorkflowControllerDataProvider.java            |  11 --
 .../controller/stages/CurrentStateOutput.java      |  14 +-
 .../stages/task/TaskSchedulingStage.java           |   8 +
 .../helix/task/AssignableInstanceManager.java      | 172 +++++++++++++++++++
 .../helix/integration/task/TestStuckTaskQuota.java | 189 +++++++++++++++++++++
 5 files changed, 382 insertions(+), 12 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index d5bc11e..45e1319 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -92,17 +92,6 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
     // Refresh TaskCache
     _taskDataCache.refresh(accessor, getResourceConfigMap());
 
-    // Refresh AssignableInstanceManager
-    AssignableInstanceManager assignableInstanceManager =
-        _taskDataCache.getAssignableInstanceManager();
-
-    // Build from scratch every time
-    assignableInstanceManager.buildAssignableInstances(getClusterConfig(), _taskDataCache,
-        getLiveInstances(), getInstanceConfigMap());
-
-    // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being
-    assignableInstanceManager.logQuotaProfileJSON(false);
-
     long duration = System.currentTimeMillis() - startTime;
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
         "END: WorkflowControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index dc82a61..a81fd2c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -273,7 +273,7 @@ public class CurrentStateOutput {
   }
 
   /**
-   * Given resource, returns current state map (parition -> instance -> currentState)
+   * Given resource, returns current state map (partition -> instance -> currentState)
    * @param resourceName
    * @return
    */
@@ -338,6 +338,18 @@ public class CurrentStateOutput {
   }
 
   /**
+   * Given resource, returns pending message map (partition -> instance -> message)
+   * @param resourceName
+   * @return
+   */
+  public Map<Partition, Map<String, Message>> getPendingMessageMap(String resourceName) {
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      return _pendingMessageMap.get(resourceName);
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
    * Get the partitions mapped in the current state
    * @param resourceId resource to look up
    * @return set of mapped partitions, or empty set if there are none
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 430de4b..5b4d580 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -76,6 +76,14 @@ public class TaskSchedulingStage extends AbstractBaseStage {
           "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
+
+    // Build quota capacity based on Current State and Pending Messages
+    cache.getAssignableInstanceManager().buildAssignableInstancesFromCurrentState(
+        cache.getClusterConfig(), cache.getTaskDataCache(), cache.getLiveInstances(), cache.getInstanceConfigMap(),
+        currentStateOutput, resourceMap);
+
+    cache.getAssignableInstanceManager().logQuotaProfileJSON(false);
+
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index eb966ac..cca9335 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -27,9 +27,13 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
 import org.apache.helix.task.assigner.AssignableInstance;
 import org.apache.helix.task.assigner.TaskAssignResult;
 import org.codehaus.jackson.JsonNode;
@@ -177,6 +181,174 @@ public class AssignableInstanceManager {
   }
 
   /**
+   * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
+   * CurrentState. It re-computes current quota profile for each AssignableInstance.
+   * If a task current state is INIT or RUNNING or if there is a pending message which it's ToState
+   * is RUNNING, the task/partition will be assigned to AssignableInstances of the instance.
+   * @param clusterConfig
+   * @param taskDataCache
+   * @param liveInstances
+   * @param instanceConfigs
+   * @param currentStateOutput
+   * @param resourceMap
+   */
+  public void buildAssignableInstancesFromCurrentState(ClusterConfig clusterConfig,
+      TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances,
+      Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput currentStateOutput,
+      Map<String, Resource> resourceMap) {
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
+
+    // Create all AssignableInstance objects based on what's in liveInstances
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Create an AssignableInstance
+      AssignableInstance assignableInstance =
+          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
+      LOG.debug("AssignableInstance created for instance: {}", instanceName);
+    }
+
+    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
+
+    // Update task profiles by traversing all CurrentStates
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resourceName = resourceEntry.getKey();
+      if (resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+        JobConfig jobConfig = jobConfigMap.get(resourceName);
+        JobContext jobContext = taskDataCache.getJobContext(resourceName);
+        String quotaType = getQuotaType(jobConfig);
+        Map<Partition, Map<String, String>> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName);
+        for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : currentStateMap
+            .entrySet()) {
+          Partition partition = currentStateMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, String> instanceCurrentStateEntry : currentStateMapEntry.getValue()
+              .entrySet()) {
+            String assignedInstance = instanceCurrentStateEntry.getKey();
+            String taskState = instanceCurrentStateEntry.getValue();
+            // If a task in in INIT or RUNNING state on the instance, this task should occupy one
+            // quota from this instance.
+            if (taskState.equals(TaskPartitionState.INIT.name())
+                || taskState.equals(TaskPartitionState.RUNNING.name())) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+        Map<Partition, Map<String, Message>> pendingMessageMap =
+            currentStateOutput.getPendingMessageMap(resourceName);
+        for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry : pendingMessageMap
+            .entrySet()) {
+          Partition partition = pendingMessageMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, Message> instancePendingMessageEntry : pendingMessageMapEntry
+              .getValue().entrySet()) {
+            String assignedInstance = instancePendingMessageEntry.getKey();
+            String messageToState = instancePendingMessageEntry.getValue().getToState();
+            // If there is a pending message on the instance which has ToState of RUNNING, the task
+            // will run on the instance soon. So the task needs to occupy one quota on this instance.
+            if (messageToState.equals(TaskPartitionState.RUNNING.name())
+                && !TaskPartitionState.INIT.name().equals(
+                    currentStateOutput.getCurrentState(resourceName, partition, assignedInstance))
+                && !TaskPartitionState.RUNNING.name().equals(currentStateOutput
+                    .getCurrentState(resourceName, partition, assignedInstance))) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+      }
+    }
+    LOG.info(
+        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
+    computeGlobalThreadBasedCapacity();
+  }
+
+  /**
+   * Assign the task to the instance's Assignable Instance
+   * @param instance
+   * @param jobConfig
+   * @param taskId
+   * @param quotaType
+   */
+  private void assignTaskToInstance(String instance, JobConfig jobConfig, String taskId,
+      String quotaType) {
+    if (_assignableInstanceMap.containsKey(instance)) {
+      TaskConfig taskConfig = getTaskConfig(jobConfig, taskId);
+      AssignableInstance assignableInstance = _assignableInstanceMap.get(instance);
+      TaskAssignResult taskAssignResult =
+          assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
+      if (taskAssignResult.isSuccessful()) {
+        _taskAssignResultMap.put(taskId, taskAssignResult);
+        LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
+            instance);
+      }
+    } else {
+      LOG.debug(
+          "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
+              + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
+              + "up for this task. TaskId: {}, Instance: {}",
+          taskId, instance);
+    }
+  }
+
+  /**
+   * Extract the quota type information of the Job
+   * @param jobConfig
+   * @return
+   */
+  private String getQuotaType(JobConfig jobConfig) {
+    // If jobConfig is null (job has been deleted but participant has not dropped the task yet), use
+    // default quota for the task
+    if (jobConfig == null || jobConfig.getJobType() == null) {
+      return AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+    return jobConfig.getJobType();
+  }
+
+  /**
+   * Calculate the TaskID based on the JobConfig and JobContext information
+   * @param jobConfig
+   * @param jobContext
+   * @param partition
+   * @return
+   */
+  private String getTaskID(JobConfig jobConfig, JobContext jobContext, Partition partition) {
+    if (jobConfig == null || jobContext == null) {
+      // If JobConfig or JobContext is null, use the partition name
+      return partition.getPartitionName();
+    }
+    int taskIndex = TaskUtil.getPartitionId(partition.getPartitionName());
+    String taskId = jobContext.getTaskIdForPartition(taskIndex);
+    if (taskId == null) {
+      // For targeted tasks, taskId will be null
+      // We instead use pName (see FixedTargetTaskAssignmentCalculator)
+      taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
+    }
+    return taskId;
+  }
+
+  /**
+   * A method that return the task config a task based on the JonConfig information
+   * @param jobConfig
+   * @param taskId
+   * @return
+   */
+  private TaskConfig getTaskConfig (JobConfig jobConfig, String taskId) {
+    if (jobConfig == null){
+      return new TaskConfig(null, null, taskId, null);
+    }
+    return jobConfig.getTaskConfig(taskId);
+  }
+
+  /**
    * Updates AssignableInstances when there are changes in LiveInstances or InstanceConfig. This
    * update only keeps an up-to-date count of AssignableInstances and does NOT re-build tasks
    * (because it's costly).
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
new file mode 100644
index 0000000..a118c6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
@@ -0,0 +1,189 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+
+public class TestStuckTaskQuota extends TaskTestBase {
+  private CountDownLatch latch = new CountDownLatch(1);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 2;
+    super.beforeClass();
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+    _participants = new MockParticipantManager[_numNodes];
+
+    // Start first participant
+    startParticipantAndRegisterNewMockTask(0);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testStuckTaskQuota() throws Exception {
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    String workflowName3 = TestHelper.getTestMethodName() + "_3";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 =
+        new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(40)
+            .setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    JobConfig.Builder jobBuilder2 = new JobConfig.Builder().setWorkflow(workflowName2)
+        .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    JobConfig.Builder jobBuilder3 = new JobConfig.Builder().setWorkflow(workflowName3)
+        .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+    Workflow.Builder workflowBuilder2 =
+        new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+    Workflow.Builder workflowBuilder3 =
+        new Workflow.Builder(workflowName3).addJob(jobName, jobBuilder3);
+
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure the JOB0 of workflow1 is started and all of the tasks are assigned to the
+    // participant 0
+    _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+        TaskState.IN_PROGRESS);
+
+    String participant0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    for (int i = 0; i < 40; i++) {
+      int finalI = i;
+      Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING
+          .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName))
+              .getPartitionState(finalI))
+          && participant0
+              .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName))
+                  .getAssignedParticipant(finalI))),
+          TestHelper.WAIT_DURATION));
+    }
+
+    // Start the second participant
+    startParticipantAndRegisterNewMockTask(1);
+
+    _driver.start(workflowBuilder2.build());
+    // Make sure the JOB0 of workflow2 is started and the only task of this job is assigned to
+    // participant1
+    _driver.pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName),
+        TaskState.IN_PROGRESS);
+    String participant1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
+    Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING.equals(_driver
+        .getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName)).getPartitionState(0))
+        && participant1
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName))
+                .getAssignedParticipant(0))),
+        TestHelper.WAIT_DURATION));
+
+    // Delete the workflow1
+    _driver.delete(workflowName1);
+
+    // Since the tasks will be stuck for workflow1 after the deletion, the participant 0 is out of
+    // capacity. Hence, the new tasks should be assigned to participant 1
+    _driver.start(workflowBuilder3.build());
+
+    // Make sure the JOB0 of workflow3 is started and the only task of this job is assigned to
+    // participant1
+    _driver.pollForJobState(workflowName3, TaskUtil.getNamespacedJobName(workflowName3, jobName),
+        TaskState.IN_PROGRESS);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> (TaskPartitionState.RUNNING
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName))
+                .getPartitionState(0))),
+            TestHelper.WAIT_DURATION)
+        && participant1
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName))
+                .getAssignedParticipant(0)));
+    latch.countDown();
+    // Stop the workflow2 and workflow3
+    _driver.waitToStop(workflowName2, 5000L);
+    _driver.waitToStop(workflowName3, 5000L);
+  }
+
+  private void startParticipantAndRegisterNewMockTask(int participantIndex) {
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);
+    String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + participantIndex);
+    _participants[participantIndex] =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+    // Register a Task state model factory.
+    StateMachineEngine stateMachine = _participants[participantIndex].getStateMachineEngine();
+    stateMachine.registerStateModelFactory("Task",
+        new TaskStateModelFactory(_participants[participantIndex], taskFactoryReg));
+    _participants[participantIndex].syncStart();
+  }
+
+  /**
+   * A mock task that extents MockTask class to count the number of cancel messages.
+   */
+  private class NewMockTask extends MockTask {
+
+    NewMockTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        latch.await();
+      } catch (Exception e) {
+        // Pass
+      }
+      super.cancel();
+    }
+  }
+}


[helix] 04/10: Remove previousAssignment in processTaskWithPendingMessage method (#1040)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c122577bd7d5e5e90b4db2db2629e115331d39e7
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Thu Jun 4 10:52:54 2020 -0700

    Remove previousAssignment in processTaskWithPendingMessage method (#1040)
    
    Remove previousAssignment in processTaskWithPendingMessage method
    
    The processTaskWithPendingMessage method is relying on the previousAssignment.
    In this commit, this method has been modified and previousAssignment has been
    replaced with currentState.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  |  74 +++++------
 .../java/org/apache/helix/task/JobDispatcher.java  |   7 +-
 ...eviousAssignedTaskStatusWithPendingMessage.java | 148 +++++++++++++++++++++
 3 files changed, 183 insertions(+), 46 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 8934337..fa12203 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -66,9 +66,9 @@ public abstract class AbstractTaskDispatcher {
   // Job Update related methods
 
   public void updatePreviousAssignedTasksStatus(
-      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
-      String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg,
-      ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
+      JobContext jobCtx, JobConfig jobCfg, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
       Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
       Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
@@ -130,8 +130,8 @@ public abstract class AbstractTaskDispatcher {
           // If there is a pending message whose destination state is different from the current
           // state, just make the same assignment as the pending message. This is essentially
           // "waiting" until this state transition is complete
-          processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
-              pendingMessage, jobState, currState, paMap, assignedPartitions);
+          processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
+              paMap, assignedPartitions);
           continue;
         }
 
@@ -300,6 +300,9 @@ public abstract class AbstractTaskDispatcher {
             // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
             partitionsToDropFromIs.add(pId);
 
+            assignedPartitions.get(instance).add(pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+
             // Also release resources for these tasks
             assignableInstanceManager.release(instance, taskConfig, quotaType);
             break;
@@ -439,7 +442,6 @@ public abstract class AbstractTaskDispatcher {
   /**
    * Create an assignment based on an already-existing pending message. This effectively lets the
    * Controller to "wait" until the pending state transition has been processed.
-   * @param prevAssignment
    * @param pId
    * @param pName
    * @param instance
@@ -449,43 +451,31 @@ public abstract class AbstractTaskDispatcher {
    * @param paMap
    * @param assignedPartitions
    */
-  private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId,
-      String pName, String instance, Message pendingMessage, TaskState jobState,
-      TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
-      Map<String, Set<Integer>> assignedPartitions) {
-
-    // stateMap is a mapping of Instance -> TaskPartitionState (String)
-    Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-    if (stateMap != null) {
-      String prevState = stateMap.get(instance);
-      if (!pendingMessage.getToState().equals(prevState)) {
-        LOG.warn(String.format(
-            "Task pending to-state is %s while previous assigned state is %s. This should not"
-                + "happen.",
-            pendingMessage.getToState(), prevState));
+  private void processTaskWithPendingMessage(Integer pId, String pName, String instance,
+      Message pendingMessage, TaskState jobState, TaskPartitionState currState,
+      Map<Integer, PartitionAssignment> paMap, Map<String, Set<Integer>> assignedPartitions) {
+
+    if (jobState == TaskState.TIMING_OUT && currState == TaskPartitionState.INIT
+        && pendingMessage.getToState().equals(TaskPartitionState.RUNNING.name())) {
+      // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT,
+      // so that Helix will cancel the transition.
+      paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
+      assignedPartitions.get(instance).add(pId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Task partition %s has a pending state transition on instance %s INIT->RUNNING. CurrentState is %s "
+                + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
+            pName, instance, currState.name()));
       }
-      if (jobState == TaskState.TIMING_OUT && currState == TaskPartitionState.INIT
-          && prevState.equals(TaskPartitionState.RUNNING.name())) {
-        // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT,
-        // so that Helix will cancel the transition.
-        paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
-        assignedPartitions.get(instance).add(pId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format(
-              "Task partition %s has a pending state transition on instance %s INIT->RUNNING. Previous state %s"
-                  + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
-              pName, instance, prevState));
-        }
-      } else {
-        // Otherwise, Just copy forward
-        // the state assignment from the previous ideal state.
-        paMap.put(pId, new PartitionAssignment(instance, prevState));
-        assignedPartitions.get(instance).add(pId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format(
-              "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-              pName, instance, prevState));
-        }
+    } else {
+      // Otherwise, Just copy forward
+      // the state assignment from the pending message
+      paMap.put(pId, new PartitionAssignment(instance, pendingMessage.getToState()));
+      assignedPartitions.get(instance).add(pId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Task partition %s has a pending state transition on instance %s. Using the pending message ToState which was %s.",
+            pName, instance, pendingMessage.getToState()));
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index b35252c..10a1b7c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -251,10 +251,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource,
-        currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
-        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache,
-        tasksToDrop);
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
+        jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
+        partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
 
     addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java b/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java
new file mode 100644
index 0000000..f9127e1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java
@@ -0,0 +1,148 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * This test checks the scheduling decision for the task that has already been assigned to an
+ * instance and there exists a message pending for that task.
+ */
+public class TestUpdatePreviousAssignedTaskStatusWithPendingMessage {
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String INSTANCE_NAME = "TestInstance";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int PARTITION_ID = 0;
+
+  /**
+   * Scenario:
+   * JobState = TIMING_OUT
+   * Task State: Context= INIT, CurrentState = INIT
+   * Pending Message: FromState = INIT, ToState = RUNNING
+   */
+  @Test
+  public void testTaskWithPendingMessageWhileJobTimingOut() {
+    JobDispatcher jobDispatcher = new JobDispatcher();
+    // Preparing the inputs
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments = new HashMap<>();
+    SortedSet<Integer> tasks = new TreeSet<>();
+    tasks.add(PARTITION_ID);
+    currentInstanceToTaskAssignments.put(INSTANCE_NAME, tasks);
+    Map<Integer, AbstractTaskDispatcher.PartitionAssignment> paMap = new TreeMap<>();
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.INIT,
+        TaskPartitionState.INIT, TaskPartitionState.RUNNING);
+    JobContext jobContext = prepareJobContext(TaskPartitionState.INIT);
+    JobConfig jobConfig = prepareJobConfig();
+    Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
+    tasksToDrop.put(INSTANCE_NAME, new HashSet<>());
+    WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider();
+    jobDispatcher.updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments,
+        new HashSet<>(), JOB_NAME, currentStateOutput, jobContext, jobConfig, TaskState.TIMING_OUT,
+        new HashMap<>(), new HashSet<>(), paMap, TargetState.STOP, new HashSet<>(), cache,
+        tasksToDrop);
+    Assert.assertEquals(paMap.get(0)._state, TaskPartitionState.INIT.name());
+  }
+
+  /**
+   * Scenario:
+   * JobState = IN_PROGRESS
+   * Task State: Context= RUNNING, CurrentState = RUNNING
+   * Pending Message: FromState = RUNNING, ToState = DROPPED
+   */
+  @Test
+  public void testTaskWithPendingMessage() {
+    JobDispatcher jobDispatcher = new JobDispatcher();
+    // Preparing the inputs
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments = new HashMap<>();
+    SortedSet<Integer> tasks = new TreeSet<>();
+    tasks.add(PARTITION_ID);
+    currentInstanceToTaskAssignments.put(INSTANCE_NAME, tasks);
+    Map<Integer, AbstractTaskDispatcher.PartitionAssignment> paMap = new TreeMap<>();
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
+        TaskPartitionState.RUNNING, TaskPartitionState.DROPPED);
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING);
+    JobConfig jobConfig = prepareJobConfig();
+    Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
+    tasksToDrop.put(INSTANCE_NAME, new HashSet<>());
+    WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider();
+    jobDispatcher.updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments,
+        new HashSet<>(), JOB_NAME, currentStateOutput, jobContext, jobConfig, TaskState.IN_PROGRESS,
+        new HashMap<>(), new HashSet<>(), paMap, TargetState.START, new HashSet<>(), cache,
+        tasksToDrop);
+    Assert.assertEquals(paMap.get(0)._state, TaskPartitionState.DROPPED.name());
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    return jobConfigBuilder.build();
+  }
+
+  private JobContext prepareJobContext(TaskPartitionState taskPartitionState) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(PARTITION_ID, taskPartitionState);
+    jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private CurrentStateOutput prepareCurrentState(TaskPartitionState currentState,
+      TaskPartitionState messageFromState, TaskPartitionState messageToState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, INSTANCE_NAME, currentState.name());
+    Message message = new Message(Message.MessageType.STATE_TRANSITION, "123456789");
+    message.setFromState(messageFromState.name());
+    message.setToState(messageToState.name());
+    currentStateOutput.setPendingMessage(JOB_NAME, taskPartition, INSTANCE_NAME, message);
+    return currentStateOutput;
+  }
+}


[helix] 06/10: Remove previousAssignment read/write to ZK (#1074)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f11243f693eee572c2e681ef4a3feafd3d2dd88d
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jun 17 11:26:27 2020 -0700

    Remove previousAssignment read/write to ZK (#1074)
    
    Remove previousAssignment read/write to ZK
    
    In this commit, the previousAssignment has been removed from codebase
    and controller will no longer read/write previousAssignment from/to ZK.
---
 .../apache/helix/common/caches/TaskDataCache.java  | 50 ++--------------------
 .../java/org/apache/helix/task/JobDispatcher.java  | 16 ++-----
 .../org/apache/helix/task/WorkflowDispatcher.java  |  5 +--
 .../task/TestTaskSchedulingTwoCurrentStates.java   | 13 ------
 .../helix/integration/task/TestTaskStopQueue.java  | 10 +----
 .../helix/task/TestTargetedTaskStateChange.java    | 25 +----------
 6 files changed, 11 insertions(+), 108 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index adb1c54..f6f6a72 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -59,9 +59,6 @@ public class TaskDataCache extends AbstractDataCache {
   // TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future,
   // concurrency will be hard to handle.
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
-  private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>();
-  private Set<String> _prevAssignmentToUpdate = new HashSet<>();
-  private Set<String> _prevAssignmentToRemove = new HashSet<>();
   private Set<String> _contextToUpdate = new HashSet<>();
   private Set<String> _contextToRemove = new HashSet<>();
   // The following fields have been added for quota-based task scheduling
@@ -72,8 +69,7 @@ public class TaskDataCache extends AbstractDataCache {
   private Set<String> _dispatchedJobs = new HashSet<>();
 
   private enum TaskDataType {
-    CONTEXT,
-    PREV_ASSIGNMENT
+    CONTEXT
   }
 
 
@@ -99,7 +95,7 @@ public class TaskDataCache extends AbstractDataCache {
    */
   public synchronized boolean refresh(HelixDataAccessor accessor,
       Map<String, ResourceConfig> resourceConfigMap) {
-    refreshContextsAndPreviousAssignments(accessor);
+    refreshContexts(accessor);
     // update workflow and job configs.
     _workflowConfigMap.clear();
     Map<String, JobConfig> newJobConfigs = new HashMap<>();
@@ -175,11 +171,10 @@ public class TaskDataCache extends AbstractDataCache {
     return true;
   }
 
-  private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) {
+  private void refreshContexts(HelixDataAccessor accessor) {
     // TODO: Need an optimize for reading context only if the refresh is needed.
     long start = System.currentTimeMillis();
     _contextMap.clear();
-    _prevAssignmentMap.clear();
     if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName()
         .equalsIgnoreCase(UNKNOWN_CLUSTER)) {
       return;
@@ -187,22 +182,15 @@ public class TaskDataCache extends AbstractDataCache {
     String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(),
         PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT);
     List<String> contextPaths = new ArrayList<>();
-    List<String> prevAssignmentPaths = new ArrayList<>();
     List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
     if (childNames == null) {
       return;
     }
     for (String resourceName : childNames) {
       contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
-      //Workflow does not have previous assignment
-      if (!_workflowConfigMap.containsKey(resourceName)) {
-        prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT));
-      }
     }
-
+    
     List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true);
-    List<ZNRecord> prevAssignments =
-        accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0, true);
 
     for (int i = 0; i < contexts.size(); i++) {
       ZNRecord context = contexts.get(i);
@@ -215,12 +203,6 @@ public class TaskDataCache extends AbstractDataCache {
       }
     }
 
-    for (ZNRecord prevAssignment : prevAssignments) {
-      if (prevAssignment != null) {
-        _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment);
-      }
-    }
-
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, genEventInfo(),
           "# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
@@ -325,13 +307,6 @@ public class TaskDataCache extends AbstractDataCache {
         TaskDataType.CONTEXT);
     batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT);
     _contextToRemove.clear();
-
-    _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove);
-    batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap,
-        _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT);
-    batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove),
-        TaskDataType.PREV_ASSIGNMENT);
-    _prevAssignmentToRemove.clear();
   }
 
   private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames,
@@ -427,8 +402,6 @@ public class TaskDataCache extends AbstractDataCache {
     switch (taskDataType) {
     case CONTEXT:
       return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE);
-    case PREV_ASSIGNMENT:
-      return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE);
     }
     return null;
   }
@@ -451,19 +424,4 @@ public class TaskDataCache extends AbstractDataCache {
     }
     return null;
   }
-
-  public ResourceAssignment getPreviousAssignment(String resourceName) {
-    return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment(
-        _prevAssignmentMap.get(resourceName)) : null;
-  }
-
-  public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) {
-    _prevAssignmentMap.put(resourceName, prevAssignment.getRecord());
-    _prevAssignmentToUpdate.add(resourceName);
-  }
-
-  public void removePrevAssignment(String resourceName) {
-    _prevAssignmentMap.remove(resourceName);
-    _prevAssignmentToRemove.add(resourceName);
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 191a2ea..c2b724b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -133,17 +133,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
     }
 
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment =
-        _dataProvider.getTaskDataCache().getPreviousAssignment(jobName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(jobName);
-    }
-
     // Will contain the list of partitions that must be explicitly dropped from the ideal state that
     // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
     Set<String> liveInstances =
         jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
             : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
@@ -191,7 +182,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
     _dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
-    _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, newAssignment);
 
     LOG.debug("Job " + jobName + " new assignment "
         + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
@@ -382,7 +372,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
-   * @return instance -> partitionIds from previous assignment, if the instance is still live
+   * @return instance -> partitionIds from currentState, if the instance is still live
    */
   protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
       Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
@@ -424,8 +414,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
   }
 
   /**
-   * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
-   * deleted) it is added from context. Otherwise, the context won't be updated.
+   * If partition is missing from prevInstanceToTaskAssignments it is added from context. Otherwise,
+   * the context won't be updated.
    * @param jobCtx Job Context
    * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
    */
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 4c143d2..53be558 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -588,7 +588,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         // Only remove from cache when remove all workflow success. Otherwise, batch write will
         // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
         // and jobs will rescheduled again.
-        removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
+        removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
       }
     } else {
       LOG.info("Did not clean up workflow " + workflow
@@ -596,12 +596,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     }
   }
 
-  private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs,
+  private void removeContexts(String workflow, Set<String> jobs,
       TaskDataCache cache) {
     if (jobs != null) {
       for (String job : jobs) {
         cache.removeContext(job);
-        cache.removePrevAssignment(job);
       }
     }
     cache.removeContext(workflow);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index fe9cb3c..bb970c7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -35,15 +35,12 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -185,16 +182,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
     }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(isCurrentStateCreated);
 
-    String previousAssignmentPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
-        + namespacedJobName + "/PreviousResourceAssignment";
-    ResourceAssignment prevAssignment = new ResourceAssignment(namespacedJobName);
-    Map<String, String> replicaMap = new HashMap<>();
-    replicaMap.put(instanceP0, TaskPartitionState.RUNNING.name());
-    Partition taskPartition = new Partition(namespacedJobName + "_0");
-    prevAssignment.addReplicaMap(taskPartition, replicaMap);
-    _manager.getHelixDataAccessor().getBaseDataAccessor().set(previousAssignmentPath,
-        prevAssignment.getRecord(), AccessOption.PERSISTENT);
-
     // Wait until the job is finished.
     _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.COMPLETED);
     Assert.assertEquals(CANCEL_COUNT.get(), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
index add5687..f9ae845 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
@@ -33,8 +33,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
- * This test makes sure the workflow can be stopped if previousAssignment and currentState are
- * deleted.
+ * This test makes sure the workflow can be stopped if currentState is deleted.
  */
 public class TestTaskStopQueue extends TaskTestBase {
   private static final long TIMEOUT = 200000L;
@@ -75,13 +74,6 @@ public class TestTaskStopQueue extends TaskTestBase {
           .exists(currentStatePath, AccessOption.PERSISTENT));
     }
 
-    String previousAssignment = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
-        + namespacedJobName + "/PreviousResourceAssignment";
-    _manager.getHelixDataAccessor().getBaseDataAccessor().remove(previousAssignment,
-        AccessOption.PERSISTENT);
-    Assert.assertFalse(_manager.getHelixDataAccessor().getBaseDataAccessor()
-        .exists(previousAssignment, AccessOption.PERSISTENT));
-
     // Start the Controller
     String controllerName = CONTROLLER_PREFIX + "_1";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index 3913c2d..b79dcb9 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -20,7 +20,6 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -77,7 +75,6 @@ public class TestTargetedTaskStateChange {
    * different instances.
    * Scenario:
    * Instance0: Slave, Instance1: Master, Instance2: Slave
-   * PreviousAssignment of Task: Instance0: Running
    * CurrentState: Instance0: Running, Instance1: Running
    * Expected paMap: Instance0 -> Dropped
    */
@@ -91,8 +88,6 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
     when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
-    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
-        .thenReturn(mock._resourceAssignment);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -114,12 +109,9 @@ public class TestTargetedTaskStateChange {
   }
 
   /**
-   * This test checks the behaviour of the controller while there is one current state which is
-   * different from
-   * Previous Assignment information.
+   * This test checks the behaviour of the controller while there is one current state.
    * Scenario:
    * Instance0: Slave, Instance1: Master, Instance2: Slave
-   * PreviousAssignment of Task: Instance0: Dropped
    * CurrentState: Instance0: Running
    * Expected paMap: Instance1 -> Running
    */
@@ -133,8 +125,6 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
     when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
-    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
-        .thenReturn(mock._resourceAssignment2);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -291,15 +281,6 @@ public class TestTargetedTaskStateChange {
     return currentStateOutput;
   }
 
-  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
-    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
-    Map<String, String> replicaMap = new HashMap<>();
-    replicaMap.put(instance, state);
-    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
-    prevAssignment.addReplicaMap(taskPartition, replicaMap);
-    return prevAssignment;
-  }
-
   private class MockTestInformation {
     private static final String SLAVE_INSTANCE = INSTANCE_PREFIX + "0";
     private static final String MASTER_INSTANCE = INSTANCE_PREFIX + "1";
@@ -316,10 +297,6 @@ public class TestTargetedTaskStateChange {
         SLAVE_INSTANCE, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
     private CurrentStateOutput _currentStateOutput2 =
         prepareCurrentState2(MASTER_INSTANCE, TaskPartitionState.RUNNING.name());
-    private ResourceAssignment _resourceAssignment =
-        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.RUNNING.name());
-    private ResourceAssignment _resourceAssignment2 =
-        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.DROPPED.name());
     private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
     private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);
 


[helix] 07/10: Respect Maximum Number Of Attempts for the tasks (#1142)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d39b4561080066993e04f54e83cba0cbcb9323be
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Jul 21 13:01:47 2020 -0700

    Respect Maximum Number Of Attempts for the tasks (#1142)
    
    In this commit, several scheduling parts have been changed in order to
    enforce the scheduler to respect maximum number of attempts for
    the tasks.
    
    Also, it has been observed that when a task being dropped and
    scheduled again, max number of attempts is not being respected.
    in this commit, further checks are added to avoid schedule the
    tasks again once we reach its maximum number of attempts.
---
 .../WorkflowControllerDataProvider.java            |  18 +--
 .../apache/helix/task/AbstractTaskDispatcher.java  |  49 +++++--
 .../java/org/apache/helix/task/JobDispatcher.java  |   2 +-
 .../integration/task/TestForceDeleteWorkflow.java  |   6 +-
 .../task/TestMaxNumberOfAttemptsMasterSwitch.java  | 152 +++++++++++++++++++++
 .../helix/integration/task/TestStopWorkflow.java   |   6 +-
 .../helix/task/TestTargetedTaskStateChange.java    |   4 +-
 7 files changed, 205 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 1032417..d5bc11e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -58,7 +58,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
 
   // For detecting live instance and target resource partition state change in task assignment
   // Used in AbstractTaskDispatcher
-  private boolean _existsLiveInstanceOrCurrentStateChange = false;
+  private boolean _existsLiveInstanceOrCurrentStateOrMessageChange = false;
 
   public WorkflowControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
@@ -71,12 +71,14 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
   }
 
   private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> propertyRefreshed) {
-    // This is for targeted jobs' task assignment. It needs to watch for current state changes for
-    // when targeted resources' state transitions complete
-    _existsLiveInstanceOrCurrentStateChange =
+    // This is for targeted jobs' task assignment. It needs to watch for current state or message
+    // changes for when targeted resources' state transitions complete
+    _existsLiveInstanceOrCurrentStateOrMessageChange =
         // TODO read and update CURRENT_STATE in the BaseControllerDataProvider as well.
-        // This check (and set) is necessary for now since the current state flag in _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
+        // This check (and set) is necessary for now since the current state flag in
+        // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
             || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
@@ -119,7 +121,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    _existsLiveInstanceOrCurrentStateChange = true;
+    _existsLiveInstanceOrCurrentStateOrMessageChange = true;
     super.setLiveInstances(liveInstances);
   }
 
@@ -257,8 +259,8 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
    * task-assigning in AbstractTaskDispatcher.
    * @return
    */
-  public boolean getExistsLiveInstanceOrCurrentStateChange() {
-    return _existsLiveInstanceOrCurrentStateChange;
+  public boolean getExistsLiveInstanceOrCurrentStateOrMessageChange() {
+    return _existsLiveInstanceOrCurrentStateOrMessageChange;
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index ffbdcef..904ecbd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -74,6 +75,12 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> tasksToDrop) {
 
+    // If a job is in one of the following states and its tasks are in RUNNING states, the tasks
+    // will be aborted.
+    Set<TaskState> jobStatesForAbortingTasks =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED));
+
     // Get AssignableInstanceMap for releasing resources for tasks in terminal states
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
@@ -185,17 +192,11 @@ public abstract class AbstractTaskDispatcher {
         switch (currState) {
         case RUNNING: {
           TaskPartitionState nextState = TaskPartitionState.RUNNING;
-          if (jobState == TaskState.TIMING_OUT) {
+          if (jobStatesForAbortingTasks.contains(jobState)) {
             nextState = TaskPartitionState.TASK_ABORTED;
           } else if (jobTgtState == TargetState.STOP) {
             nextState = TaskPartitionState.STOPPED;
-          } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
-              || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
-            // Drop tasks if parent job is not in progress
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-            break;
           }
-
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
@@ -548,8 +549,8 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
 
     // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
-    boolean existsLiveInstanceOrCurrentStateChange =
-        cache.getExistsLiveInstanceOrCurrentStateChange();
+    boolean existsLiveInstanceOrCurrentStateOrMessageChangeChange =
+        cache.getExistsLiveInstanceOrCurrentStateOrMessageChange();
 
     // The excludeSet contains the set of task partitions that must be excluded from consideration
     // when making any new assignments.
@@ -560,7 +561,7 @@ public abstract class AbstractTaskDispatcher {
       excludeSet.addAll(assignedSet);
     }
     addCompletedTasks(excludeSet, jobCtx, allPartitions);
-    addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+    addPartitionsReachedMaximumRetries(excludeSet, jobCtx, allPartitions, jobCfg);
     excludeSet.addAll(skippedPartitions);
     Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
     excludeSet.addAll(partitionsWithDelay);
@@ -576,7 +577,8 @@ public abstract class AbstractTaskDispatcher {
     Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
     // If the job is a targeted job, in case of live instance change, we need to assign
     // non-terminal tasks so that they could be re-scheduled
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // This job is a targeted job, so FixedAssignmentCalculator will be used
       // There has been a live instance change. Must re-add incomplete task partitions to be
       // re-assigned and re-scheduled
@@ -612,7 +614,8 @@ public abstract class AbstractTaskDispatcher {
     }
 
     // If this is a targeted job and if there was a live instance change
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // Drop current jobs only if they are assigned to a different instance, regardless of
       // the jobCfg.isRebalanceRunningTask() setting
       dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
@@ -745,8 +748,12 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-  // add all partitions that have been tried maxNumberAttempts
-  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx,
+  // Add all partitions/tasks that are cannot be retried. These tasks are:
+  // 1- Task is in ABORTED or ERROR state.
+  // 2- Task has just gone to TIMED_OUT, ERROR or DROPPED states and has reached to its
+  // maxNumberAttempts
+  // These tasks determine whether the job needs to FAILED or not.
+  protected static void addGivenUpPartitions(Set<Integer> set, JobContext ctx,
       Iterable<Integer> pIds, JobConfig cfg) {
     for (Integer pId : pIds) {
       if (isTaskGivenup(ctx, cfg, pId)) {
@@ -755,6 +762,17 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
+  // Add all partitions that have reached their maxNumberAttempts. These tasks should not be
+  // considered for scheduling again.
+  protected static void addPartitionsReachedMaximumRetries(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds, JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask()) {
+        set.add(pId);
+      }
+    }
+  }
+
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, Set<Integer> throttled, int n) {
     List<Integer> result = new ArrayList<>();
@@ -829,7 +847,8 @@ public abstract class AbstractTaskDispatcher {
     if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) {
       return true;
     }
-    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR) {
+    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR
+        || state == TaskPartitionState.DROPPED) {
       return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
     }
     return false;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c2b724b..b10eb5e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -245,7 +245,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
         partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
 
-    addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
+    addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
     if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
         || (jobCfg.getTargetResource() != null
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
index 2a12568..ae724f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
@@ -304,17 +304,17 @@ public class TestForceDeleteWorkflow extends TaskTestBase {
     //             JOB1 JOB2
 
     JobConfig.Builder jobBuilder0 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
new file mode 100644
index 0000000..8683c3f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Test to check is maximum number of attempts being respected while target partition is switching
+ * continuously.
+ */
+public class TestMaxNumberOfAttemptsMasterSwitch extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private List<String> _assignmentList1;
+  private List<String> _assignmentList2;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _driver = new TaskDriver(_manager);
+
+    // Assignment1: localhost_12918: Master, localhost_12919:Slave, localhost_12920: Slave
+    _assignmentList1 = new ArrayList<>();
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+
+    // Assignment2: localhost_12919: Master, localhost_12918:Slave, localhost_12920: Slave
+    _assignmentList2 = new ArrayList<>();
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testMaxNumberOfAttemptsMasterSwitch() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    int maxNumberOfAttempts = 5;
+    assignCustomizedIdealState(_assignmentList1);
+
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(maxNumberOfAttempts)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+    String nameSpacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, "JOB0");
+
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.IN_PROGRESS);
+    boolean isAssignmentInIdealState = true;
+
+    // Turn on and off the instance (10 times) and make sure task gets retried and number of
+    // attempts gets incremented every time.
+    // Also make sure that the task won't be retried more than maxNumberOfAttempts
+    for (int i = 1; i <= 2 * maxNumberOfAttempts; i++) {
+      int expectedRetryNumber = Math.min(i, maxNumberOfAttempts);
+      Assert
+          .assertTrue(
+              TestHelper.verify(
+                  () -> (_driver.getJobContext(nameSpacedJobName)
+                      .getPartitionNumAttempts(0) == expectedRetryNumber),
+                  TestHelper.WAIT_DURATION));
+      if (isAssignmentInIdealState) {
+        assignCustomizedIdealState(_assignmentList2);
+        verifyMastership(_assignmentList2);
+        isAssignmentInIdealState = false;
+      } else {
+        assignCustomizedIdealState(_assignmentList1);
+        verifyMastership(_assignmentList1);
+        isAssignmentInIdealState = true;
+      }
+    }
+
+    // Since the task reaches max number of attempts, ths job will fails.
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.FAILED);
+    Assert.assertEquals(_driver.getJobContext(nameSpacedJobName).getPartitionNumAttempts(0),
+        maxNumberOfAttempts);
+  }
+
+  private void assignCustomizedIdealState(List<String> _assignmentList) {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DATABASE);
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(0), "MASTER");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(1), "SLAVE");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(2), "SLAVE");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, DATABASE,
+        idealState);
+  }
+
+  private void verifyMastership(List<String> _assignmentList) throws Exception {
+    String instance = _assignmentList.get(0);
+    boolean isMasterSwitchedToCorrectInstance = TestHelper.verify(() -> {
+      ExternalView externalView =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DATABASE);
+      if (externalView == null) {
+        return false;
+      }
+      Map<String, String> stateMap = externalView.getStateMap(DATABASE + "_0");
+      if (stateMap == null) {
+        return false;
+      }
+      return "MASTER".equals(stateMap.get(instance));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isMasterSwitchedToCorrectInstance);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 25cab50..08dc776 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -58,9 +58,9 @@ public class TestStopWorkflow extends TaskTestBase {
     stopTestSetup(5);
 
     String jobQueueName = TestHelper.getTestMethodName();
-    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+    JobConfig.Builder jobBuilder =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setWorkflow(jobQueueName)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index b79dcb9..e849be2 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -93,7 +93,7 @@ public class TestTargetedTaskStateChange {
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(true);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
@@ -130,7 +130,7 @@ public class TestTargetedTaskStateChange {
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(false);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())