You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:39 UTC

[helix] 03/10: Make the task scheduling decision independent of the PreviousAssignment (#994)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ce100d2230c2f136092efb0d2ca7ef6d7a697326
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue May 26 16:49:42 2020 -0700

    Make the task scheduling decision independent of the PreviousAssignment (#994)
    
    In this commit, the previous scheduling logic which was based on PreviousAssignment,
    has been changed and will no longer depend on prevAssignment. Instead, the task scheduling will be based solely on the CurrentState.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 100 ++++++++++++++-------
 .../java/org/apache/helix/task/JobDispatcher.java  |  74 +++++----------
 .../helix/task/TestDropTerminalTasksUponReset.java |   3 +-
 3 files changed, 90 insertions(+), 87 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 60c2402..8934337 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -66,7 +66,7 @@ public abstract class AbstractTaskDispatcher {
   // Job Update related methods
 
   public void updatePreviousAssignedTasksStatus(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg,
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
@@ -78,11 +78,11 @@ public abstract class AbstractTaskDispatcher {
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
     // Iterate through all instances
-    for (String instance : prevInstanceToTaskAssignments.keySet()) {
+    for (String instance : currentInstanceToTaskAssignments.keySet()) {
       assignedPartitions.put(instance, new HashSet<>());
 
       // Set all dropping transitions first. These are tasks coming from Participant disconnects
-      // that have some active current state (INIT or RUNNING) and the requestedState of DROPPED.
+      // and have the requestedState of DROPPED.
       // These need to be prioritized over any other state transitions because of the race condition
       // with the same pId (task) running on other instances. This is because in paMap, we can only
       // define one transition per pId
@@ -99,7 +99,7 @@ public abstract class AbstractTaskDispatcher {
       }
 
       // If not an excluded instance, we must instantiate its entry in assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already made an assignment in
       // paMap above for them to be dropped. The following does this.
@@ -107,8 +107,7 @@ public abstract class AbstractTaskDispatcher {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in either INIT or DROPPED states
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
@@ -121,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
               instance, pId);
           continue;
         }
-        // This avoids a race condition in the case that although currentState is in the following
-        // error condition, the pending message (INIT->RUNNNING) might still be present.
-        // This is undesirable because this prevents JobContext from getting the proper update of
-        // fields including task state and task's NUM_ATTEMPTS
-        if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR
-            || currState == TaskPartitionState.TIMED_OUT
-            || currState == TaskPartitionState.TASK_ABORTED) {
-          // Do not increment the task attempt count here - it will be incremented at scheduling
-          // time
-          markPartitionError(jobCtx, pId, currState);
-        }
 
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
@@ -242,16 +230,16 @@ public abstract class AbstractTaskDispatcher {
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
                 pName, currState));
           }
           partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-
           // This task is COMPLETED, so release this task
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
@@ -263,7 +251,11 @@ public abstract class AbstractTaskDispatcher {
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the task will be retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
@@ -389,13 +381,59 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }
+      if (currentState == TaskPartitionState.COMPLETED) {
+        markPartitionCompleted(jobCtx, pId);
+      }
+      // This avoids a race condition in the case that although currentState is in the following
+      // error condition, the pending message (INIT->RUNNNING) might still be present.
+      // This is undesirable because this prevents JobContext from getting the proper update of
+      // fields including task state and task's NUM_ATTEMPTS
+      if (currentState == TaskPartitionState.ERROR || currentState == TaskPartitionState.TASK_ERROR
+          || currentState == TaskPartitionState.TIMED_OUT
+          || currentState == TaskPartitionState.TASK_ABORTED) {
+        // Do not increment the task attempt count here - it will be incremented at scheduling
+        // time
+        markPartitionError(jobCtx, pId, currentState);
+      }
     }
-    return currentState;
   }
 
   /**
@@ -511,7 +549,7 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
       final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
       final WorkflowControllerDataProvider cache,
@@ -580,7 +618,7 @@ public abstract class AbstractTaskDispatcher {
       // TODO: isRebalanceRunningTask() was originally put in place to allow users to move
       // ("rebalance") long-running tasks, but there hasn't been a clear use case for this
       // Previously, there was a bug in the condition above (it was || where it should have been &&)
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
 
@@ -588,11 +626,11 @@ public abstract class AbstractTaskDispatcher {
     if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
       // Drop current jobs only if they are assigned to a different instance, regardless of
       // the jobCfg.isRebalanceRunningTask() setting
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
     // Go through ALL instances and assign/throttle tasks accordingly
-    for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet()) {
+    for (Map.Entry<String, SortedSet<Integer>> entry : currentInstanceToTaskAssignments.entrySet()) {
       String instance = entry.getKey();
       if (!tgtPartitionAssignments.containsKey(instance)) {
         // There is no assignment made for this instance, so it is safe to skip
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c14cee9..b35252c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -238,21 +238,20 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // These dropping transitions will be prioritized above all task state transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
         assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache,
         tasksToDrop);
@@ -318,7 +317,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances, jobResource,
           currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
           prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
           taskAssignmentCal, allPartitions, currentTime, liveInstances);
@@ -380,45 +379,24 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds to result and update their states in JobContext in
+    // updatePreviousAssignedTasksStatus method.
     Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
@@ -426,26 +404,14 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
-        TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here because old
-          // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to tasksToDrop no matter
           // what its current state is so that it will be dropped
+          // This is trying to drop tasks on a reconnected instance with a new sessionId that have
+          // all of their requestedState == DROPPED
           if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) {
             if (!tasksToDrop.containsKey(instance)) {
               tasksToDrop.put(instance, new HashSet<>());
@@ -462,10 +428,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
    * deleted) it is added from context. Otherwise, the context won't be updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
    */
   protected void updateInstanceToTaskAssignmentsFromContext(JobContext jobCtx,
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments) {
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments) {
     for (Integer partition : jobCtx.getPartitionSet()) {
       // We must add all active task pIds back here
       // The states other than Running and Init do not need to be added.
@@ -474,9 +440,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           || jobCtx.getPartitionState(partition) == TaskPartitionState.INIT) {
         String instance = jobCtx.getAssignedParticipant(partition);
         if (instance != null) {
-          if (prevInstanceToTaskAssignments.containsKey(instance)
-              && !prevInstanceToTaskAssignments.get(instance).contains(partition)) {
-            prevInstanceToTaskAssignments.get(instance).add(partition);
+          if (currentInstanceToTaskAssignments.containsKey(instance)
+              && !currentInstanceToTaskAssignments.get(instance).contains(partition)) {
+            currentInstanceToTaskAssignments.get(instance).add(partition);
           }
         }
       }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
index ca07e97..58dc2d1 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
@@ -90,8 +90,7 @@ public class TestDropTerminalTasksUponReset {
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
     // Call the static method we are testing
-    JobDispatcher.getPrevInstanceToTaskAssignments(liveInstances, prevAssignment, allTaskPartitions,
-        currentStateOutput, jobName, tasksToDrop);
+    JobDispatcher.getCurrentInstanceToTaskAssignments(liveInstances, currentStateOutput, jobName, tasksToDrop);
 
     // Check that tasksToDrop has (numTasks / 2) partitions as we intended regardless of what the
     // current states of the tasks were