You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/26 01:07:12 UTC
git commit: [HELIX-482] Support delayed task reassignment
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 19de46fe5 -> c2e411328
[HELIX-482] Support delayed task reassignment
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c2e41132
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c2e41132
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c2e41132
Branch: refs/heads/helix-0.6.x
Commit: c2e411328d4b6fbfd94eddc2c48c69284d2130c3
Parents: 19de46f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Aug 22 13:46:07 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 25 16:06:51 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 27 ++++-
.../java/org/apache/helix/task/JobContext.java | 102 ++++++++-----------
.../org/apache/helix/task/TaskRebalancer.java | 86 +++++++++++++---
.../org/apache/helix/task/beans/JobBean.java | 1 +
.../task/TestIndependentTaskRebalancer.java | 51 ++++++++++
5 files changed, 190 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 1dad5e4..780db55 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -67,6 +67,8 @@ public class JobConfig {
public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
/** The number of tasks within the job that are allowed to fail. */
public static final String FAILURE_THRESHOLD = "FailureThreshold";
+ /** The amount of time in ms to wait before retrying a task */
+ public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
/** The individual task configurations, if any **/
public static final String TASK_CONFIGS = "TaskConfigs";
@@ -74,6 +76,7 @@ public class JobConfig {
// // Default property values ////
public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+ public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
public static final int DEFAULT_FAILURE_THRESHOLD = 0;
@@ -90,12 +93,14 @@ public class JobConfig {
private final int _maxAttemptsPerTask;
private final int _maxForcedReassignmentsPerTask;
private final int _failureThreshold;
+ private final long _retryDelay;
private final Map<String, TaskConfig> _taskConfigMap;
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
- int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
+ int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
+ Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -107,6 +112,7 @@ public class JobConfig {
_maxAttemptsPerTask = maxAttemptsPerTask;
_maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
_failureThreshold = failureThreshold;
+ _retryDelay = retryDelay;
if (taskConfigMap != null) {
_taskConfigMap = taskConfigMap;
} else {
@@ -158,6 +164,10 @@ public class JobConfig {
return _failureThreshold;
}
+ public long getTaskRetryDelay() {
+ return _retryDelay;
+ }
+
public Map<String, TaskConfig> getTaskConfigMap() {
return _taskConfigMap;
}
@@ -187,6 +197,9 @@ public class JobConfig {
if (_targetPartitions != null) {
cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
}
+ if (_retryDelay > 0) {
+ cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+ }
cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
@@ -210,13 +223,15 @@ public class JobConfig {
private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
+ private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
public JobConfig build() {
validate();
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
- _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap);
+ _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
+ _taskConfigMap);
}
/**
@@ -264,6 +279,9 @@ public class JobConfig {
if (cfg.containsKey(FAILURE_THRESHOLD)) {
b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
}
+ if (cfg.containsKey(TASK_RETRY_DELAY)) {
+ b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+ }
return b;
}
@@ -322,6 +340,11 @@ public class JobConfig {
return this;
}
+ public Builder setTaskRetryDelay(long v) {
+ _retryDelay = v;
+ return this;
+ }
+
public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
if (taskConfigs != null) {
for (TaskConfig taskConfig : taskConfigs) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index f843834..77885cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -44,7 +44,8 @@ public class JobContext extends HelixProperty {
FINISH_TIME,
TARGET,
TASK_ID,
- ASSIGNED_PARTICIPANT
+ ASSIGNED_PARTICIPANT,
+ NEXT_RETRY_TIME
}
public JobContext(ZNRecord record) {
@@ -76,21 +77,15 @@ public class JobContext extends HelixProperty {
}
public void setPartitionState(int p, TaskPartitionState s) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.STATE.toString(), s.name());
}
public TaskPartitionState getPartitionState(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
+ Map<String, String> map = getMapField(p);
if (map == null) {
return null;
}
-
String str = map.get(ContextProperties.STATE.toString());
if (str != null) {
return TaskPartitionState.valueOf(str);
@@ -100,12 +95,7 @@ public class JobContext extends HelixProperty {
}
public void setPartitionNumAttempts(int p, int n) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
}
@@ -120,61 +110,42 @@ public class JobContext extends HelixProperty {
}
public int getPartitionNumAttempts(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
+ Map<String, String> map = getMapField(p);
if (map == null) {
return -1;
}
-
String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
if (nStr == null) {
return -1;
}
-
return Integer.parseInt(nStr);
}
public void setPartitionFinishTime(int p, long t) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
}
public long getPartitionFinishTime(int p) {
- Map<String, String> map = _record.getMapField(String.valueOf(p));
+ Map<String, String> map = getMapField(p);
if (map == null) {
return -1;
}
-
String tStr = map.get(ContextProperties.FINISH_TIME.toString());
if (tStr == null) {
return -1;
}
-
return Long.parseLong(tStr);
}
public void setPartitionTarget(int p, String targetPName) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.TARGET.toString(), targetPName);
}
public String getTargetForPartition(int p) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- return null;
- } else {
- return map.get(ContextProperties.TARGET.toString());
- }
+ Map<String, String> map = getMapField(p);
+ return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null;
}
public Map<String, List<Integer>> getPartitionsByTarget() {
@@ -206,23 +177,13 @@ public class JobContext extends HelixProperty {
}
public void setTaskIdForPartition(int p, String taskId) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.TASK_ID.toString(), taskId);
}
public String getTaskIdForPartition(int p) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- return null;
- } else {
- return map.get(ContextProperties.TASK_ID.toString());
- }
+ Map<String, String> map = getMapField(p);
+ return (map != null) ? map.get(ContextProperties.TASK_ID.toString()) : null;
}
public Map<String, Integer> getTaskIdPartitionMap() {
@@ -238,18 +199,39 @@ public class JobContext extends HelixProperty {
}
public void setAssignedParticipant(int p, String participantName) {
- String pStr = String.valueOf(p);
- Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- map = new TreeMap<String, String>();
- _record.setMapField(pStr, map);
- }
+ Map<String, String> map = getMapField(p);
map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
}
public String getAssignedParticipant(int p) {
+ Map<String, String> map = getMapField(p);
+ return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+ }
+
+ public void setNextRetryTime(int p, long t) {
+ Map<String, String> map = getMapField(p);
+ map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+ }
+
+ public long getNextRetryTime(int p) {
+ Map<String, String> map = getMapField(p);
+ if (map == null) {
+ return -1;
+ }
+ String tStr = map.get(ContextProperties.NEXT_RETRY_TIME.toString());
+ if (tStr == null) {
+ return -1;
+ }
+ return Long.parseLong(tStr);
+ }
+
+ public Map<String, String> getMapField(int p) {
String pStr = String.valueOf(p);
Map<String, String> map = _record.getMapField(pStr);
- return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ return map;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index f8c6415..131236e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -64,8 +64,8 @@ import com.google.common.collect.Sets;
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
- // Management of already-scheduled workflows across jobs
- private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
+ // Management of already-scheduled rebalances across jobs
+ private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
.newSingleThreadScheduledExecutor();
@@ -252,6 +252,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
Map<String, SortedSet<Integer>> taskAssignments =
getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+ long currentTime = System.currentTimeMillis();
for (String instance : taskAssignments.keySet()) {
Set<Integer> pSet = taskAssignments.get(instance);
// Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
@@ -360,7 +361,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
}
if (!successOptional) {
- long finishTime = System.currentTimeMillis();
+ long finishTime = currentTime;
workflowCtx.setJobState(jobResource, TaskState.FAILED);
if (workflowConfig.isTerminable()) {
workflowCtx.setWorkflowState(TaskState.FAILED);
@@ -374,6 +375,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
skippedPartitions.add(pId);
partitionsToDropFromIs.add(pId);
}
+ } else {
+ // Mark the task to be started at some later time (if enabled)
+ markPartitionDelayed(jobCfg, jobCtx, pId);
}
}
break;
@@ -395,8 +399,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
pSet.removeAll(donePartitions);
}
+ // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+ scheduleForNextTask(jobResource, jobCtx, currentTime);
+
if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
- long currentTime = System.currentTimeMillis();
workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
jobCtx.setFinishTime(currentTime);
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
@@ -409,10 +415,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (jobTgtState == TargetState.START) {
// Contains the set of task partitions that must be excluded from consideration when making
// any new assignments.
- // This includes all completed, failed, already assigned partitions.
+ // This includes all completed, failed, delayed, and already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
addCompletedPartitions(excludeSet, jobCtx, allPartitions);
excludeSet.addAll(skippedPartitions);
+ excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments =
getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
@@ -476,9 +483,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (delayFromStart <= 0) {
// Remove any timers that are past-time for this workflow
- Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource);
+ Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
- SCHEDULED_WORKFLOWS.remove(workflowResource);
+ SCHEDULED_TIMES.remove(workflowResource);
}
// Recurring workflows are just templates that spawn new workflows
@@ -534,8 +541,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
}
// No need to schedule the same runnable at the same time
- if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
- || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+ if (SCHEDULED_TIMES.containsKey(workflowResource)
+ || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
return false;
}
@@ -543,20 +550,50 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return false;
}
- private void scheduleRebalance(String workflowResource, String jobResource, Date startTime,
- long delayFromStart) {
+ private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
// No need to schedule the same runnable at the same time
- if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
- || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+ if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
return;
}
// For workflows not yet scheduled, schedule them and record it
RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
- SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
+ SCHEDULED_TIMES.put(id, startTime);
SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
}
+ private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
+ // Clear current entries if they exist and are expired
+ long currentTime = now;
+ Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
+ if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+ SCHEDULED_TIMES.remove(jobResource);
+ }
+
+ // Figure out the earliest schedulable time in the future of a non-complete job
+ boolean shouldSchedule = false;
+ long earliestTime = Long.MAX_VALUE;
+ for (int p : ctx.getPartitionSet()) {
+ long retryTime = ctx.getNextRetryTime(p);
+ TaskPartitionState state = ctx.getPartitionState(p);
+ state = (state != null) ? state : TaskPartitionState.INIT;
+ Set<TaskPartitionState> errorStates =
+ Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
+ TaskPartitionState.TIMED_OUT);
+ if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
+ earliestTime = retryTime;
+ shouldSchedule = true;
+ }
+ }
+
+ // If any was found, then schedule it
+ if (shouldSchedule) {
+ long delay = earliestTime - currentTime;
+ Date startTime = new Date(earliestTime);
+ scheduleRebalance(jobResource, jobResource, startTime, delay);
+ }
+ }
+
/**
* Checks if the job has completed.
* @param ctx The rebalancer context.
@@ -770,6 +807,15 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return result;
}
+ private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
+ long delayInterval = cfg.getTaskRetryDelay();
+ if (delayInterval <= 0) {
+ return;
+ }
+ long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+ ctx.setNextRetryTime(p, nextStartTime);
+ }
+
private static void markPartitionCompleted(JobContext ctx, int pId) {
ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
@@ -814,10 +860,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
}
}
}
-
return result;
}
+ private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+ Set<Integer> nonReadyPartitions = Sets.newHashSet();
+ for (int p : ctx.getPartitionSet()) {
+ long toStart = ctx.getNextRetryTime(p);
+ if (now < toStart) {
+ nonReadyPartitions.add(p);
+ }
+ }
+ return nonReadyPartitions;
+ }
+
/**
* Computes the partition name given the resource name and partition id.
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index bc5350a..32fd5ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -41,4 +41,5 @@ public class JobBean {
public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
+ public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b7f20d1..1f17e92 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.ScheduleConfig;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
@@ -104,6 +105,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
return new TaskTwo(context, instanceName);
}
});
+ taskFactoryReg.put("SingleFailTask", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new SingleFailTask();
+ }
+ });
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
@@ -279,6 +286,33 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
}
+ @Test
+ public void testDelayedRetry() throws Exception {
+ // Create a single job with single task, set retry delay
+ int delay = 3000;
+ String jobName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+ Map<String, String> taskConfigMap = Maps.newHashMap();
+ TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
+ taskConfigs.add(taskConfig1);
+ workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+ workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+ workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
+ Map<String, String> jobConfigMap = Maps.newHashMap();
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ SingleFailTask.hasFailed = false;
+ _driver.start(workflowBuilder.build());
+
+ // Ensure completion
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+ // Ensure a single retry happened
+ JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
+ Assert.assertEquals(jobCtx.getPartitionNumAttempts(0), 2);
+ Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
+ }
+
private class TaskOne extends ReindexTask {
private final boolean _shouldFail;
private final String _instanceName;
@@ -327,4 +361,21 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
super(context, instanceName);
}
}
+
+ private static class SingleFailTask implements Task {
+ public static boolean hasFailed = false;
+
+ @Override
+ public TaskResult run() {
+ if (!hasFailed) {
+ hasFailed = true;
+ return new TaskResult(Status.ERROR, null);
+ }
+ return new TaskResult(Status.COMPLETED, null);
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
}