You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/26 01:07:12 UTC

git commit: [HELIX-482] Support delayed task reassignment

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 19de46fe5 -> c2e411328


[HELIX-482] Support delayed task reassignment


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c2e41132
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c2e41132
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c2e41132

Branch: refs/heads/helix-0.6.x
Commit: c2e411328d4b6fbfd94eddc2c48c69284d2130c3
Parents: 19de46f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Aug 22 13:46:07 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 25 16:06:51 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  27 ++++-
 .../java/org/apache/helix/task/JobContext.java  | 102 ++++++++-----------
 .../org/apache/helix/task/TaskRebalancer.java   |  86 +++++++++++++---
 .../org/apache/helix/task/beans/JobBean.java    |   1 +
 .../task/TestIndependentTaskRebalancer.java     |  51 ++++++++++
 5 files changed, 190 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 1dad5e4..780db55 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -67,6 +67,8 @@ public class JobConfig {
   public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
   /** The number of tasks within the job that are allowed to fail. */
   public static final String FAILURE_THRESHOLD = "FailureThreshold";
+  /** The amount of time in ms to wait before retrying a task */
+  public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
 
   /** The individual task configurations, if any **/
   public static final String TASK_CONFIGS = "TaskConfigs";
@@ -74,6 +76,7 @@ public class JobConfig {
   // // Default property values ////
 
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+  public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
   public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
   public static final int DEFAULT_FAILURE_THRESHOLD = 0;
@@ -90,12 +93,14 @@ public class JobConfig {
   private final int _maxAttemptsPerTask;
   private final int _maxForcedReassignmentsPerTask;
   private final int _failureThreshold;
+  private final long _retryDelay;
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
-      int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
+      int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
+      Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -107,6 +112,7 @@ public class JobConfig {
     _maxAttemptsPerTask = maxAttemptsPerTask;
     _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
     _failureThreshold = failureThreshold;
+    _retryDelay = retryDelay;
     if (taskConfigMap != null) {
       _taskConfigMap = taskConfigMap;
     } else {
@@ -158,6 +164,10 @@ public class JobConfig {
     return _failureThreshold;
   }
 
+  public long getTaskRetryDelay() {
+    return _retryDelay;
+  }
+
   public Map<String, TaskConfig> getTaskConfigMap() {
     return _taskConfigMap;
   }
@@ -187,6 +197,9 @@ public class JobConfig {
     if (_targetPartitions != null) {
       cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
     }
+    if (_retryDelay > 0) {
+      cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+    }
     cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
     cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
     cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
@@ -210,13 +223,15 @@ public class JobConfig {
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
     private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
+    private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
 
     public JobConfig build() {
       validate();
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap);
+          _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
+          _taskConfigMap);
     }
 
     /**
@@ -264,6 +279,9 @@ public class JobConfig {
       if (cfg.containsKey(FAILURE_THRESHOLD)) {
         b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
       }
+      if (cfg.containsKey(TASK_RETRY_DELAY)) {
+        b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+      }
       return b;
     }
 
@@ -322,6 +340,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setTaskRetryDelay(long v) {
+      _retryDelay = v;
+      return this;
+    }
+
     public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
       if (taskConfigs != null) {
         for (TaskConfig taskConfig : taskConfigs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index f843834..77885cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -44,7 +44,8 @@ public class JobContext extends HelixProperty {
     FINISH_TIME,
     TARGET,
     TASK_ID,
-    ASSIGNED_PARTICIPANT
+    ASSIGNED_PARTICIPANT,
+    NEXT_RETRY_TIME
   }
 
   public JobContext(ZNRecord record) {
@@ -76,21 +77,15 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionState(int p, TaskPartitionState s) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.STATE.toString(), s.name());
   }
 
   public TaskPartitionState getPartitionState(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    Map<String, String> map = getMapField(p);
     if (map == null) {
       return null;
     }
-
     String str = map.get(ContextProperties.STATE.toString());
     if (str != null) {
       return TaskPartitionState.valueOf(str);
@@ -100,12 +95,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionNumAttempts(int p, int n) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
   }
 
@@ -120,61 +110,42 @@ public class JobContext extends HelixProperty {
   }
 
   public int getPartitionNumAttempts(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    Map<String, String> map = getMapField(p);
     if (map == null) {
       return -1;
     }
-
     String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
     if (nStr == null) {
       return -1;
     }
-
     return Integer.parseInt(nStr);
   }
 
   public void setPartitionFinishTime(int p, long t) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
   }
 
   public long getPartitionFinishTime(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    Map<String, String> map = getMapField(p);
     if (map == null) {
       return -1;
     }
-
     String tStr = map.get(ContextProperties.FINISH_TIME.toString());
     if (tStr == null) {
       return -1;
     }
-
     return Long.parseLong(tStr);
   }
 
   public void setPartitionTarget(int p, String targetPName) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.TARGET.toString(), targetPName);
   }
 
   public String getTargetForPartition(int p) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      return null;
-    } else {
-      return map.get(ContextProperties.TARGET.toString());
-    }
+    Map<String, String> map = getMapField(p);
+    return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null;
   }
 
   public Map<String, List<Integer>> getPartitionsByTarget() {
@@ -206,23 +177,13 @@ public class JobContext extends HelixProperty {
   }
 
   public void setTaskIdForPartition(int p, String taskId) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.TASK_ID.toString(), taskId);
   }
 
   public String getTaskIdForPartition(int p) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      return null;
-    } else {
-      return map.get(ContextProperties.TASK_ID.toString());
-    }
+    Map<String, String> map = getMapField(p);
+    return (map != null) ? map.get(ContextProperties.TASK_ID.toString()) : null;
   }
 
   public Map<String, Integer> getTaskIdPartitionMap() {
@@ -238,18 +199,39 @@ public class JobContext extends HelixProperty {
   }
 
   public void setAssignedParticipant(int p, String participantName) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
+    Map<String, String> map = getMapField(p);
     map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
   }
 
   public String getAssignedParticipant(int p) {
+    Map<String, String> map = getMapField(p);
+    return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+  }
+
+  public void setNextRetryTime(int p, long t) {
+    Map<String, String> map = getMapField(p);
+    map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getNextRetryTime(int p) {
+    Map<String, String> map = getMapField(p);
+    if (map == null) {
+      return -1;
+    }
+    String tStr = map.get(ContextProperties.NEXT_RETRY_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+    return Long.parseLong(tStr);
+  }
+
+  public Map<String, String> getMapField(int p) {
     String pStr = String.valueOf(p);
     Map<String, String> map = _record.getMapField(pStr);
-    return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    return map;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index f8c6415..131236e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -64,8 +64,8 @@ import com.google.common.collect.Sets;
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
 
-  // Management of already-scheduled workflows across jobs
-  private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
+  // Management of already-scheduled rebalances across jobs
+  private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
   private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
       .newSingleThreadScheduledExecutor();
 
@@ -252,6 +252,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
     Map<String, SortedSet<Integer>> taskAssignments =
         getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    long currentTime = System.currentTimeMillis();
     for (String instance : taskAssignments.keySet()) {
       Set<Integer> pSet = taskAssignments.get(instance);
       // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
@@ -360,7 +361,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
             }
 
             if (!successOptional) {
-              long finishTime = System.currentTimeMillis();
+              long finishTime = currentTime;
               workflowCtx.setJobState(jobResource, TaskState.FAILED);
               if (workflowConfig.isTerminable()) {
                 workflowCtx.setWorkflowState(TaskState.FAILED);
@@ -374,6 +375,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
               skippedPartitions.add(pId);
               partitionsToDropFromIs.add(pId);
             }
+          } else {
+            // Mark the task to be started at some later time (if enabled)
+            markPartitionDelayed(jobCfg, jobCtx, pId);
           }
         }
           break;
@@ -395,8 +399,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       pSet.removeAll(donePartitions);
     }
 
+    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+    scheduleForNextTask(jobResource, jobCtx, currentTime);
+
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
-      long currentTime = System.currentTimeMillis();
       workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
       jobCtx.setFinishTime(currentTime);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
@@ -409,10 +415,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     if (jobTgtState == TargetState.START) {
       // Contains the set of task partitions that must be excluded from consideration when making
       // any new assignments.
-      // This includes all completed, failed, already assigned partitions.
+      // This includes all completed, failed, delayed, and already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
       addCompletedPartitions(excludeSet, jobCtx, allPartitions);
       excludeSet.addAll(skippedPartitions);
+      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
       // Get instance->[partition, ...] mappings for the target resource.
       Map<String, SortedSet<Integer>> tgtPartitionAssignments =
           getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
@@ -476,9 +483,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
     if (delayFromStart <= 0) {
       // Remove any timers that are past-time for this workflow
-      Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource);
+      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
       if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-        SCHEDULED_WORKFLOWS.remove(workflowResource);
+        SCHEDULED_TIMES.remove(workflowResource);
       }
 
       // Recurring workflows are just templates that spawn new workflows
@@ -534,8 +541,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // No need to schedule the same runnable at the same time
-    if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
-        || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+    if (SCHEDULED_TIMES.containsKey(workflowResource)
+        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
       return false;
     }
 
@@ -543,20 +550,50 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return false;
   }
 
-  private void scheduleRebalance(String workflowResource, String jobResource, Date startTime,
-      long delayFromStart) {
+  private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
     // No need to schedule the same runnable at the same time
-    if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
-        || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+    if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
       return;
     }
 
     // For workflows not yet scheduled, schedule them and record it
     RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
-    SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
+    SCHEDULED_TIMES.put(id, startTime);
     SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
   }
 
+  private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
+    // Clear current entries if they exist and are expired
+    long currentTime = now;
+    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
+    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+      SCHEDULED_TIMES.remove(jobResource);
+    }
+
+    // Figure out the earliest schedulable time in the future of a non-complete job
+    boolean shouldSchedule = false;
+    long earliestTime = Long.MAX_VALUE;
+    for (int p : ctx.getPartitionSet()) {
+      long retryTime = ctx.getNextRetryTime(p);
+      TaskPartitionState state = ctx.getPartitionState(p);
+      state = (state != null) ? state : TaskPartitionState.INIT;
+      Set<TaskPartitionState> errorStates =
+          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
+              TaskPartitionState.TIMED_OUT);
+      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
+        earliestTime = retryTime;
+        shouldSchedule = true;
+      }
+    }
+
+    // If any was found, then schedule it
+    if (shouldSchedule) {
+      long delay = earliestTime - currentTime;
+      Date startTime = new Date(earliestTime);
+      scheduleRebalance(jobResource, jobResource, startTime, delay);
+    }
+  }
+
   /**
    * Checks if the job has completed.
    * @param ctx The rebalancer context.
@@ -770,6 +807,15 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return result;
   }
 
+  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
+    long delayInterval = cfg.getTaskRetryDelay();
+    if (delayInterval <= 0) {
+      return;
+    }
+    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+    ctx.setNextRetryTime(p, nextStartTime);
+  }
+
   private static void markPartitionCompleted(JobContext ctx, int pId) {
     ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
@@ -814,10 +860,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         }
       }
     }
-
     return result;
   }
 
+  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+    Set<Integer> nonReadyPartitions = Sets.newHashSet();
+    for (int p : ctx.getPartitionSet()) {
+      long toStart = ctx.getNextRetryTime(p);
+      if (now < toStart) {
+        nonReadyPartitions.add(p);
+      }
+    }
+    return nonReadyPartitions;
+  }
+
   /**
    * Computes the partition name given the resource name and partition id.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index bc5350a..32fd5ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -41,4 +41,5 @@ public class JobBean {
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
   public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
   public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
+  public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b7f20d1..1f17e92 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
@@ -104,6 +105,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
           return new TaskTwo(context, instanceName);
         }
       });
+      taskFactoryReg.put("SingleFailTask", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new SingleFailTask();
+        }
+      });
 
       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
 
@@ -279,6 +286,33 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
   }
 
+  @Test
+  public void testDelayedRetry() throws Exception {
+    // Create a single job with single task, set retry delay
+    int delay = 3000;
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    Map<String, String> taskConfigMap = Maps.newHashMap();
+    TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
+    taskConfigs.add(taskConfig1);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    SingleFailTask.hasFailed = false;
+    _driver.start(workflowBuilder.build());
+
+    // Ensure completion
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure a single retry happened
+    JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
+    Assert.assertEquals(jobCtx.getPartitionNumAttempts(0), 2);
+    Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
+  }
+
   private class TaskOne extends ReindexTask {
     private final boolean _shouldFail;
     private final String _instanceName;
@@ -327,4 +361,21 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
       super(context, instanceName);
     }
   }
+
+  private static class SingleFailTask implements Task {
+    public static boolean hasFailed = false;
+
+    @Override
+    public TaskResult run() {
+      if (!hasFailed) {
+        hasFailed = true;
+        return new TaskResult(Status.ERROR, null);
+      }
+      return new TaskResult(Status.COMPLETED, null);
+    }
+
+    @Override
+    public void cancel() {
+    }
+  }
 }