You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/27 22:58:35 UTC

git commit: [HELIX-438] Improve task framework retry logic

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 4ff7e3888 -> 45c17f425


[HELIX-438] Improve task framework retry logic


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/45c17f42
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/45c17f42
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/45c17f42

Branch: refs/heads/helix-0.6.x
Commit: 45c17f4251f71f5422b0292cf5e539380a9308e4
Parents: 4ff7e38
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri May 23 14:22:48 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue May 27 13:58:16 2014 -0700

----------------------------------------------------------------------
 .../helix/task/FixedTargetTaskRebalancer.java   | 13 +--
 .../helix/task/GenericTaskRebalancer.java       | 91 ++++++++++++++++++--
 .../java/org/apache/helix/task/JobConfig.java   | 28 +++++-
 .../org/apache/helix/task/TaskRebalancer.java   |  8 +-
 .../java/org/apache/helix/task/TaskRunner.java  | 14 ++-
 .../java/org/apache/helix/task/Workflow.java    | 47 +++++-----
 .../org/apache/helix/task/beans/JobBean.java    |  3 +-
 .../task/TestIndependentTaskRebalancer.java     | 89 ++++++++++++++-----
 8 files changed, 231 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index dc6fbaa..4c4fee1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,7 +52,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
 
   @Override
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, ClusterDataCache cache) {
     IdealState tgtIs = getTgtIdealState(jobCfg, cache);
@@ -59,7 +60,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
       return Collections.emptyMap();
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
-    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+    return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
         jobContext);
   }
 
@@ -114,7 +115,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
   /**
    * Get partition assignments for the target resource, but only for the partitions of interest.
    * @param currStateOutput The current state of the instances in the cluster.
-   * @param instanceList The set of instances.
+   * @param instances The instances.
    * @param tgtIs The ideal state of the target resource.
    * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
    *          do not need to
@@ -123,10 +124,10 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
    * @return A map of instance vs set of partition ids assigned to that instance.
    */
   private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
-      CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+      CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
       Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
+    for (String instance : instances) {
       result.put(instance, new TreeSet<Integer>());
     }
 
@@ -138,7 +139,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
       }
       int pId = partitions.get(0);
       if (includeSet.contains(pId)) {
-        for (String instance : instanceList) {
+        for (String instance : instances) {
           String s =
               currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
                   instance);

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 9174eb1..8903ae2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -37,6 +38,8 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 
 import com.google.common.base.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -46,6 +49,9 @@ import com.google.common.collect.Sets;
  * assignment to target partitions and states of another resource
  */
 public class GenericTaskRebalancer extends TaskRebalancer {
+  /** Reassignment policy for this algorithm */
+  private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
+
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
@@ -63,7 +69,7 @@ public class GenericTaskRebalancer extends TaskRebalancer {
 
   @Override
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, ClusterDataCache cache) {
     // Gather input to the full auto rebalancing algorithm
@@ -115,7 +121,7 @@ public class GenericTaskRebalancer extends TaskRebalancer {
         new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
             new AutoRebalanceStrategy.DefaultPlacementScheme());
     List<String> allNodes =
-        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
     Collections.sort(allNodes);
     ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
     Map<String, List<String>> preferenceLists = record.getListFields();
@@ -133,6 +139,9 @@ public class GenericTaskRebalancer extends TaskRebalancer {
         taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
       }
     }
+
+    // Finally, adjust the assignment if tasks have been failing
+    taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
     return taskAssignment;
   }
 
@@ -140,14 +149,14 @@ public class GenericTaskRebalancer extends TaskRebalancer {
    * Filter a list of instances based on targeted resource policies
    * @param jobCfg the job configuration
    * @param currStateOutput the current state of all instances in the cluster
-   * @param instanceList valid instances
+   * @param instances valid instances
    * @param cache current snapshot of the cluster
    * @return a set of instances that can be assigned to
    */
   private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
-      Iterable<String> instanceList, ClusterDataCache cache) {
+      Iterable<String> instances, ClusterDataCache cache) {
     // No target resource means any instance is available
-    Set<String> allInstances = Sets.newHashSet(instanceList);
+    Set<String> allInstances = Sets.newHashSet(instances);
     String targetResource = jobCfg.getTargetResource();
     if (targetResource == null) {
       return allInstances;
@@ -183,4 +192,76 @@ public class GenericTaskRebalancer extends TaskRebalancer {
     allInstances.retainAll(eligibleInstances);
     return allInstances;
   }
+
+  public interface RetryPolicy {
+    /**
+     * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
+     * assigned
+     * @param jobCfg the job configuration
+     * @param jobCtx the job context
+     * @param instances instances that can serve tasks
+     * @param origAssignment the unmodified assignment
+     * @return the adjusted assignment
+     */
+    Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
+  }
+
+  private static class DefaultRetryReassigner implements RetryPolicy {
+    @Override
+    public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
+      // Compute an increasing integer ID for each instance
+      BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
+      int instanceIndex = 0;
+      for (String instance : instances) {
+        instanceMap.put(instance, instanceIndex++);
+      }
+
+      // Move partitions
+      Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
+      for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
+        String instance = e.getKey();
+        SortedSet<Integer> partitions = e.getValue();
+        Integer instanceId = instanceMap.get(instance);
+        if (instanceId != null) {
+          for (int p : partitions) {
+            // Determine for each partition if there have been failures with the current assignment
+            // strategy, and if so, force a shift in assignment for that partition only
+            int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
+            int newInstanceId = (instanceId + shiftValue) % instances.size();
+            String newInstance = instanceMap.inverse().get(newInstanceId);
+            if (newInstance == null) {
+              newInstance = instance;
+            }
+            if (!newAssignment.containsKey(newInstance)) {
+              newAssignment.put(newInstance, new TreeSet<Integer>());
+            }
+            newAssignment.get(newInstance).add(p);
+          }
+        } else {
+          // In case something goes wrong, just keep the previous assignment
+          newAssignment.put(instance, partitions);
+        }
+      }
+      return newAssignment;
+    }
+
+    /**
+     * In case tasks fail, we may not want to schedule them in the same place. This method allows us
+     * to compute a shifting value so that we can systematically choose other instances to try
+     * @param jobCfg the job configuration
+     * @param jobCtx the job context
+     * @param instances instances that can be chosen
+     * @param p the partition to look up
+     * @return the shifting value
+     */
+    private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, int p) {
+      int numAttempts = jobCtx.getPartitionNumAttempts(p);
+      int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
+      int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
+      return numAttempts / (maxNumAttempts / numInstances);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index b166da1..3f9ab41 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -61,6 +61,8 @@ public class JobConfig {
   public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
   /** The maximum number of times the task rebalancer may attempt to execute a task. */
   public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+  /** The maximum number of times Helix will intentionally move a failing task */
+  public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
   /** The number of concurrent tasks that are allowed to run on an instance. */
   public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
   /** The number of tasks within the job that are allowed to fail. */
@@ -75,6 +77,7 @@ public class JobConfig {
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
   public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
   public static final int DEFAULT_FAILURE_THRESHOLD = 0;
+  public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
 
   private final String _workflow;
   private final String _targetResource;
@@ -85,13 +88,14 @@ public class JobConfig {
   private final long _timeoutPerTask;
   private final int _numConcurrentTasksPerInstance;
   private final int _maxAttemptsPerTask;
+  private final int _maxForcedReassignmentsPerTask;
   private final int _failureThreshold;
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
-      int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
+      int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -101,6 +105,7 @@ public class JobConfig {
     _timeoutPerTask = timeoutPerTask;
     _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
     _maxAttemptsPerTask = maxAttemptsPerTask;
+    _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
     _failureThreshold = failureThreshold;
     if (taskConfigMap != null) {
       _taskConfigMap = taskConfigMap;
@@ -145,6 +150,10 @@ public class JobConfig {
     return _maxAttemptsPerTask;
   }
 
+  public int getMaxForcedReassignmentsPerTask() {
+    return _maxForcedReassignmentsPerTask;
+  }
+
   public int getFailureThreshold() {
     return _failureThreshold;
   }
@@ -180,6 +189,7 @@ public class JobConfig {
     }
     cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
     cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+    cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
     cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
     return cfgMap;
   }
@@ -198,6 +208,7 @@ public class JobConfig {
     private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+    private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
 
     public JobConfig build() {
@@ -205,7 +216,7 @@ public class JobConfig {
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerTask, _failureThreshold, _taskConfigMap);
+          _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap);
     }
 
     /**
@@ -246,6 +257,10 @@ public class JobConfig {
       if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
         b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
       }
+      if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
+        b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
+            .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+      }
       if (cfg.containsKey(FAILURE_THRESHOLD)) {
         b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
       }
@@ -297,6 +312,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setMaxForcedReassignmentsPerTask(int v) {
+      _maxForcedReassignmentsPerTask = v;
+      return this;
+    }
+
     public Builder setFailureThreshold(int v) {
       _failureThreshold = v;
       return this;
@@ -340,6 +360,10 @@ public class JobConfig {
         throw new IllegalArgumentException(String.format("%s has invalid value %s",
             MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
       }
+      if (_maxForcedReassignmentsPerTask < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+      }
       if (_failureThreshold < 0) {
         throw new IllegalArgumentException(String.format("%s has invalid value %s",
             FAILURE_THRESHOLD, _failureThreshold));

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 849f339..a6244c8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -71,7 +72,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
    * Compute an assignment of tasks to instances
    * @param currStateOutput the current state of the instances
    * @param prevAssignment the previous task partition assignment
-   * @param instanceList the instances
+   * @param instances the instances
    * @param jobCfg the task configuration
    * @param taskCtx the task context
    * @param workflowCfg the workflow configuration
@@ -82,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
    */
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Iterable<String> instanceList, JobConfig jobCfg, JobContext jobContext,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
       ClusterDataCache cache);
 
@@ -181,7 +182,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   private ResourceAssignment computeResourceMapping(String jobResource,
       WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
-      Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
       WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
       ClusterDataCache cache) {
     TargetState jobTgtState = workflowConfig.getTargetState();
@@ -364,6 +365,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       // This includes all completed, failed, already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
       addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      excludeSet.addAll(skippedPartitions);
       // Get instance->[partition, ...] mappings for the target resource.
       Map<String, SortedSet<Integer>> tgtPartitionAssignments =
           getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index dea383b..7941acb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskResult.Status;
 import org.apache.log4j.Logger;
 
 /**
@@ -61,7 +62,12 @@ public class TaskRunner implements Runnable {
   public void run() {
     try {
       signalStarted();
-      _result = _task.run();
+      try {
+        _result = _task.run();
+      } catch (Throwable t) {
+        LOG.error("Problem running the task", t);
+        _result = new TaskResult(Status.ERROR, null);
+      }
 
       switch (_result.getStatus()) {
       case COMPLETED:
@@ -93,8 +99,10 @@ public class TaskRunner implements Runnable {
    * Signals the task to cancel itself.
    */
   public void timeout() {
-    _timeout = true;
-    cancel();
+    if (!_done) {
+      _timeout = true;
+      cancel();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 537f287..57404d8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -164,7 +164,9 @@ public class Workflow {
             Joiner.on(",").join(job.targetPartitions));
       }
       builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
-          String.valueOf(job.maxAttemptsPerPartition));
+          String.valueOf(job.maxAttemptsPerTask));
+      builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+          String.valueOf(job.maxForcedReassignmentsPerTask));
       builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
           String.valueOf(job.numConcurrentTasksPerInstance));
       builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
@@ -227,40 +229,41 @@ public class Workflow {
       _expiry = -1;
     }
 
-    public Builder addConfig(String node, String key, String val) {
-      node = namespacify(node);
-      _dag.addNode(node);
-      if (!_jobConfigs.containsKey(node)) {
-        _jobConfigs.put(node, new TreeMap<String, String>());
+    public Builder addConfig(String job, String key, String val) {
+      job = namespacify(job);
+      _dag.addNode(job);
+      if (!_jobConfigs.containsKey(job)) {
+        _jobConfigs.put(job, new TreeMap<String, String>());
       }
-      _jobConfigs.get(node).put(key, val);
+      _jobConfigs.get(job).put(key, val);
       return this;
     }
 
-    public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) {
-      return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+    public Builder addJobConfigMap(String job, Map<String, String> jobConfigMap) {
+      return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
     }
 
-    public Builder addJobConfig(String node, JobConfig jobConfig) {
+    public Builder addJobConfig(String job, JobConfig jobConfig) {
       for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
         String key = e.getKey();
         String val = e.getValue();
-        addConfig(node, key, val);
+        addConfig(job, key, val);
       }
-      addTaskConfigs(node, jobConfig.getTaskConfigMap().values());
+      jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name);
+      addTaskConfigs(job, jobConfig.getTaskConfigMap().values());
       return this;
     }
 
-    public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) {
-      node = namespacify(node);
-      _dag.addNode(node);
-      if (!_taskConfigs.containsKey(node)) {
-        _taskConfigs.put(node, new ArrayList<TaskConfig>());
+    public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+      job = namespacify(job);
+      _dag.addNode(job);
+      if (!_taskConfigs.containsKey(job)) {
+        _taskConfigs.put(job, new ArrayList<TaskConfig>());
       }
-      if (!_jobConfigs.containsKey(node)) {
-        _jobConfigs.put(node, new TreeMap<String, String>());
+      if (!_jobConfigs.containsKey(job)) {
+        _jobConfigs.put(job, new TreeMap<String, String>());
       }
-      _taskConfigs.get(node).addAll(taskConfigs);
+      _taskConfigs.get(job).addAll(taskConfigs);
       return this;
     }
 
@@ -277,8 +280,8 @@ public class Workflow {
       return this;
     }
 
-    public String namespacify(String task) {
-      return TaskUtil.getNamespacedJobName(_name, task);
+    public String namespacify(String job) {
+      return TaskUtil.getNamespacedJobName(_name, job);
     }
 
     public Workflow build() {

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index af5882c..bc5350a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -38,6 +38,7 @@ public class JobBean {
   public List<TaskBean> tasks;
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-  public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+  public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+  public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
   public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7041db8..10f0ac7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -62,6 +62,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
   private final MockParticipantManager[] _participants = new MockParticipantManager[n];
   private ClusterControllerManager _controller;
   private Set<String> _invokedClasses = Sets.newHashSet();
+  private Map<String, Integer> _runCounts = Maps.newHashMap();
 
   private HelixManager _manager;
   private TaskDriver _driver;
@@ -81,24 +82,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
       setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
-    // Set task callbacks
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("TaskOne", new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new TaskOne(context);
-      }
-    });
-    taskFactoryReg.put("TaskTwo", new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new TaskTwo(context);
-      }
-    });
-
     // start dummy participants
     for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+      taskFactoryReg.put("TaskOne", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new TaskOne(context, instanceName);
+        }
+      });
+      taskFactoryReg.put("TaskTwo", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new TaskTwo(context, instanceName);
+        }
+      });
+
       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
 
       // Register a Task state model factory.
@@ -124,6 +126,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
   @BeforeMethod
   public void beforeMethod() {
     _invokedClasses.clear();
+    _runCounts.clear();
   }
 
   @Test
@@ -207,10 +210,46 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
+  @Test
+  public void testReassignment() throws Exception {
+    final int NUM_INSTANCES = 2;
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    Map<String, String> taskConfigMap =
+        Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
+            + START_PORT));
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+    taskConfigs.add(taskConfig1);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
+        + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that the class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+
+    // Ensure that this was tried on two different instances, the first of which exhausted the
+    // attempts number, and the other passes on the first try
+    Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
+    Assert.assertTrue(_runCounts.values().contains(
+        JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+    Assert.assertTrue(_runCounts.values().contains(1));
+  }
+
   private class TaskOne extends ReindexTask {
     private final boolean _shouldFail;
+    private final String _instanceName;
 
-    public TaskOne(TaskCallbackContext context) {
+    public TaskOne(TaskCallbackContext context, String instanceName) {
       super(context);
 
       // Check whether or not this task should succeed
@@ -220,15 +259,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
         Map<String, String> configMap = taskConfig.getConfigMap();
         if (configMap != null && configMap.containsKey("fail")
             && Boolean.parseBoolean(configMap.get("fail"))) {
-          shouldFail = true;
+          // if a specific instance is specified, only fail for that one
+          shouldFail =
+              !configMap.containsKey("failInstance")
+                  || configMap.get("failInstance").equals(instanceName);
         }
       }
       _shouldFail = shouldFail;
+
+      // Initialize the count for this instance if not already done
+      if (!_runCounts.containsKey(instanceName)) {
+        _runCounts.put(instanceName, 0);
+      }
+      _instanceName = instanceName;
     }
 
     @Override
     public TaskResult run() {
       _invokedClasses.add(getClass().getName());
+      _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
 
       // Fail the task if it should fail
       if (_shouldFail) {
@@ -240,8 +289,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
   }
 
   private class TaskTwo extends TaskOne {
-    public TaskTwo(TaskCallbackContext context) {
-      super(context);
+    public TaskTwo(TaskCallbackContext context, String instanceName) {
+      super(context, instanceName);
     }
   }
 }