You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/27 22:58:35 UTC
git commit: [HELIX-438] Improve task framework retry logic
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 4ff7e3888 -> 45c17f425
[HELIX-438] Improve task framework retry logic
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/45c17f42
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/45c17f42
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/45c17f42
Branch: refs/heads/helix-0.6.x
Commit: 45c17f4251f71f5422b0292cf5e539380a9308e4
Parents: 4ff7e38
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri May 23 14:22:48 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue May 27 13:58:16 2014 -0700
----------------------------------------------------------------------
.../helix/task/FixedTargetTaskRebalancer.java | 13 +--
.../helix/task/GenericTaskRebalancer.java | 91 ++++++++++++++++++--
.../java/org/apache/helix/task/JobConfig.java | 28 +++++-
.../org/apache/helix/task/TaskRebalancer.java | 8 +-
.../java/org/apache/helix/task/TaskRunner.java | 14 ++-
.../java/org/apache/helix/task/Workflow.java | 47 +++++-----
.../org/apache/helix/task/beans/JobBean.java | 3 +-
.../task/TestIndependentTaskRebalancer.java | 89 ++++++++++++++-----
8 files changed, 231 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index dc6fbaa..4c4fee1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
* under the License.
*/
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -51,7 +52,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
- ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+ ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, ClusterDataCache cache) {
IdealState tgtIs = getTgtIdealState(jobCfg, cache);
@@ -59,7 +60,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
return Collections.emptyMap();
}
Set<String> tgtStates = jobCfg.getTargetPartitionStates();
- return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+ return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
jobContext);
}
@@ -114,7 +115,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
/**
* Get partition assignments for the target resource, but only for the partitions of interest.
* @param currStateOutput The current state of the instances in the cluster.
- * @param instanceList The set of instances.
+ * @param instances The instances.
* @param tgtIs The ideal state of the target resource.
* @param tgtStates Only partitions in this set of states will be considered. If null, partitions
* do not need to
@@ -123,10 +124,10 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
* @return A map of instance vs set of partition ids assigned to that instance.
*/
private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
- CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+ CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList) {
+ for (String instance : instances) {
result.put(instance, new TreeSet<Integer>());
}
@@ -138,7 +139,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer {
}
int pId = partitions.get(0);
if (includeSet.contains(pId)) {
- for (String instance : instanceList) {
+ for (String instance : instances) {
String s =
currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
instance);
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 9174eb1..8903ae2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -37,6 +38,8 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import com.google.common.base.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -46,6 +49,9 @@ import com.google.common.collect.Sets;
* assignment to target partitions and states of another resource
*/
public class GenericTaskRebalancer extends TaskRebalancer {
+ /** Reassignment policy for this algorithm */
+ private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
+
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
@@ -63,7 +69,7 @@ public class GenericTaskRebalancer extends TaskRebalancer {
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
- ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+ ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, ClusterDataCache cache) {
// Gather input to the full auto rebalancing algorithm
@@ -115,7 +121,7 @@ public class GenericTaskRebalancer extends TaskRebalancer {
new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
new AutoRebalanceStrategy.DefaultPlacementScheme());
List<String> allNodes =
- Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+ Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
Collections.sort(allNodes);
ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
Map<String, List<String>> preferenceLists = record.getListFields();
@@ -133,6 +139,9 @@ public class GenericTaskRebalancer extends TaskRebalancer {
taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
}
}
+
+ // Finally, adjust the assignment if tasks have been failing
+ taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
return taskAssignment;
}
@@ -140,14 +149,14 @@ public class GenericTaskRebalancer extends TaskRebalancer {
* Filter a list of instances based on targeted resource policies
* @param jobCfg the job configuration
* @param currStateOutput the current state of all instances in the cluster
- * @param instanceList valid instances
+ * @param instances valid instances
* @param cache current snapshot of the cluster
* @return a set of instances that can be assigned to
*/
private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
- Iterable<String> instanceList, ClusterDataCache cache) {
+ Iterable<String> instances, ClusterDataCache cache) {
// No target resource means any instance is available
- Set<String> allInstances = Sets.newHashSet(instanceList);
+ Set<String> allInstances = Sets.newHashSet(instances);
String targetResource = jobCfg.getTargetResource();
if (targetResource == null) {
return allInstances;
@@ -183,4 +192,76 @@ public class GenericTaskRebalancer extends TaskRebalancer {
allInstances.retainAll(eligibleInstances);
return allInstances;
}
+
+ public interface RetryPolicy {
+ /**
+ * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
+ * assigned
+ * @param jobCfg the job configuration
+ * @param jobCtx the job context
+ * @param instances instances that can serve tasks
+ * @param origAssignment the unmodified assignment
+ * @return the adjusted assignment
+ */
+ Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
+ }
+
+ private static class DefaultRetryReassigner implements RetryPolicy {
+ @Override
+ public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
+ // Compute an increasing integer ID for each instance
+ BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
+ int instanceIndex = 0;
+ for (String instance : instances) {
+ instanceMap.put(instance, instanceIndex++);
+ }
+
+ // Move partitions
+ Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
+ for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
+ String instance = e.getKey();
+ SortedSet<Integer> partitions = e.getValue();
+ Integer instanceId = instanceMap.get(instance);
+ if (instanceId != null) {
+ for (int p : partitions) {
+ // Determine for each partition if there have been failures with the current assignment
+ // strategy, and if so, force a shift in assignment for that partition only
+ int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
+ int newInstanceId = (instanceId + shiftValue) % instances.size();
+ String newInstance = instanceMap.inverse().get(newInstanceId);
+ if (newInstance == null) {
+ newInstance = instance;
+ }
+ if (!newAssignment.containsKey(newInstance)) {
+ newAssignment.put(newInstance, new TreeSet<Integer>());
+ }
+ newAssignment.get(newInstance).add(p);
+ }
+ } else {
+ // In case something goes wrong, just keep the previous assignment
+ newAssignment.put(instance, partitions);
+ }
+ }
+ return newAssignment;
+ }
+
+ /**
+ * In case tasks fail, we may not want to schedule them in the same place. This method allows us
+ * to compute a shifting value so that we can systematically choose other instances to try
+ * @param jobCfg the job configuration
+ * @param jobCtx the job context
+ * @param instances instances that can be chosen
+ * @param p the partition to look up
+ * @return the shifting value
+ */
+ private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
+ Collection<String> instances, int p) {
+ int numAttempts = jobCtx.getPartitionNumAttempts(p);
+ int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
+ int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
+ return numAttempts / (maxNumAttempts / numInstances);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index b166da1..3f9ab41 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -61,6 +61,8 @@ public class JobConfig {
public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
/** The maximum number of times the task rebalancer may attempt to execute a task. */
public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+ /** The maximum number of times Helix will intentionally move a failing task */
+ public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
/** The number of concurrent tasks that are allowed to run on an instance. */
public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
/** The number of tasks within the job that are allowed to fail. */
@@ -75,6 +77,7 @@ public class JobConfig {
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
public static final int DEFAULT_FAILURE_THRESHOLD = 0;
+ public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
private final String _workflow;
private final String _targetResource;
@@ -85,13 +88,14 @@ public class JobConfig {
private final long _timeoutPerTask;
private final int _numConcurrentTasksPerInstance;
private final int _maxAttemptsPerTask;
+ private final int _maxForcedReassignmentsPerTask;
private final int _failureThreshold;
private final Map<String, TaskConfig> _taskConfigMap;
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
- int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
+ int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -101,6 +105,7 @@ public class JobConfig {
_timeoutPerTask = timeoutPerTask;
_numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
_maxAttemptsPerTask = maxAttemptsPerTask;
+ _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
_failureThreshold = failureThreshold;
if (taskConfigMap != null) {
_taskConfigMap = taskConfigMap;
@@ -145,6 +150,10 @@ public class JobConfig {
return _maxAttemptsPerTask;
}
+ public int getMaxForcedReassignmentsPerTask() {
+ return _maxForcedReassignmentsPerTask;
+ }
+
public int getFailureThreshold() {
return _failureThreshold;
}
@@ -180,6 +189,7 @@ public class JobConfig {
}
cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+ cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
return cfgMap;
}
@@ -198,6 +208,7 @@ public class JobConfig {
private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+ private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
public JobConfig build() {
@@ -205,7 +216,7 @@ public class JobConfig {
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
- _maxAttemptsPerTask, _failureThreshold, _taskConfigMap);
+ _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap);
}
/**
@@ -246,6 +257,10 @@ public class JobConfig {
if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
}
+ if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
+ b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
+ .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+ }
if (cfg.containsKey(FAILURE_THRESHOLD)) {
b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
}
@@ -297,6 +312,11 @@ public class JobConfig {
return this;
}
+ public Builder setMaxForcedReassignmentsPerTask(int v) {
+ _maxForcedReassignmentsPerTask = v;
+ return this;
+ }
+
public Builder setFailureThreshold(int v) {
_failureThreshold = v;
return this;
@@ -340,6 +360,10 @@ public class JobConfig {
throw new IllegalArgumentException(String.format("%s has invalid value %s",
MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
}
+ if (_maxForcedReassignmentsPerTask < 0) {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+ }
if (_failureThreshold < 0) {
throw new IllegalArgumentException(String.format("%s has invalid value %s",
FAILURE_THRESHOLD, _failureThreshold));
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 849f339..a6244c8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -71,7 +72,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
* Compute an assignment of tasks to instances
* @param currStateOutput the current state of the instances
* @param prevAssignment the previous task partition assignment
- * @param instanceList the instances
+ * @param instances the instances
* @param jobCfg the task configuration
* @param taskCtx the task context
* @param workflowCfg the workflow configuration
@@ -82,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
*/
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
- Iterable<String> instanceList, JobConfig jobCfg, JobContext jobContext,
+ Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
ClusterDataCache cache);
@@ -181,7 +182,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
private ResourceAssignment computeResourceMapping(String jobResource,
WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
- Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
+ Collection<String> liveInstances, CurrentStateOutput currStateOutput,
WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
ClusterDataCache cache) {
TargetState jobTgtState = workflowConfig.getTargetState();
@@ -364,6 +365,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// This includes all completed, failed, already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+ excludeSet.addAll(skippedPartitions);
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments =
getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index dea383b..7941acb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskResult.Status;
import org.apache.log4j.Logger;
/**
@@ -61,7 +62,12 @@ public class TaskRunner implements Runnable {
public void run() {
try {
signalStarted();
- _result = _task.run();
+ try {
+ _result = _task.run();
+ } catch (Throwable t) {
+ LOG.error("Problem running the task", t);
+ _result = new TaskResult(Status.ERROR, null);
+ }
switch (_result.getStatus()) {
case COMPLETED:
@@ -93,8 +99,10 @@ public class TaskRunner implements Runnable {
* Signals the task to cancel itself.
*/
public void timeout() {
- _timeout = true;
- cancel();
+ if (!_done) {
+ _timeout = true;
+ cancel();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 537f287..57404d8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -164,7 +164,9 @@ public class Workflow {
Joiner.on(",").join(job.targetPartitions));
}
builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(job.maxAttemptsPerPartition));
+ String.valueOf(job.maxAttemptsPerTask));
+ builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ String.valueOf(job.maxForcedReassignmentsPerTask));
builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
String.valueOf(job.numConcurrentTasksPerInstance));
builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
@@ -227,40 +229,41 @@ public class Workflow {
_expiry = -1;
}
- public Builder addConfig(String node, String key, String val) {
- node = namespacify(node);
- _dag.addNode(node);
- if (!_jobConfigs.containsKey(node)) {
- _jobConfigs.put(node, new TreeMap<String, String>());
+ public Builder addConfig(String job, String key, String val) {
+ job = namespacify(job);
+ _dag.addNode(job);
+ if (!_jobConfigs.containsKey(job)) {
+ _jobConfigs.put(job, new TreeMap<String, String>());
}
- _jobConfigs.get(node).put(key, val);
+ _jobConfigs.get(job).put(key, val);
return this;
}
- public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) {
- return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+ public Builder addJobConfigMap(String job, Map<String, String> jobConfigMap) {
+ return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
}
- public Builder addJobConfig(String node, JobConfig jobConfig) {
+ public Builder addJobConfig(String job, JobConfig jobConfig) {
for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
String key = e.getKey();
String val = e.getValue();
- addConfig(node, key, val);
+ addConfig(job, key, val);
}
- addTaskConfigs(node, jobConfig.getTaskConfigMap().values());
+ jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name);
+ addTaskConfigs(job, jobConfig.getTaskConfigMap().values());
return this;
}
- public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) {
- node = namespacify(node);
- _dag.addNode(node);
- if (!_taskConfigs.containsKey(node)) {
- _taskConfigs.put(node, new ArrayList<TaskConfig>());
+ public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+ job = namespacify(job);
+ _dag.addNode(job);
+ if (!_taskConfigs.containsKey(job)) {
+ _taskConfigs.put(job, new ArrayList<TaskConfig>());
}
- if (!_jobConfigs.containsKey(node)) {
- _jobConfigs.put(node, new TreeMap<String, String>());
+ if (!_jobConfigs.containsKey(job)) {
+ _jobConfigs.put(job, new TreeMap<String, String>());
}
- _taskConfigs.get(node).addAll(taskConfigs);
+ _taskConfigs.get(job).addAll(taskConfigs);
return this;
}
@@ -277,8 +280,8 @@ public class Workflow {
return this;
}
- public String namespacify(String task) {
- return TaskUtil.getNamespacedJobName(_name, task);
+ public String namespacify(String job) {
+ return TaskUtil.getNamespacedJobName(_name, job);
}
public Workflow build() {
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index af5882c..bc5350a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -38,6 +38,7 @@ public class JobBean {
public List<TaskBean> tasks;
public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+ public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+ public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7041db8..10f0ac7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -62,6 +62,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
private final MockParticipantManager[] _participants = new MockParticipantManager[n];
private ClusterControllerManager _controller;
private Set<String> _invokedClasses = Sets.newHashSet();
+ private Map<String, Integer> _runCounts = Maps.newHashMap();
private HelixManager _manager;
private TaskDriver _driver;
@@ -81,24 +82,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
- // Set task callbacks
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("TaskOne", new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new TaskOne(context);
- }
- });
- taskFactoryReg.put("TaskTwo", new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new TaskTwo(context);
- }
- });
-
// start dummy participants
for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+
+ // Set task callbacks
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put("TaskOne", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new TaskOne(context, instanceName);
+ }
+ });
+ taskFactoryReg.put("TaskTwo", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new TaskTwo(context, instanceName);
+ }
+ });
+
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
@@ -124,6 +126,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
@BeforeMethod
public void beforeMethod() {
_invokedClasses.clear();
+ _runCounts.clear();
}
@Test
@@ -207,10 +210,46 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
+ @Test
+ public void testReassignment() throws Exception {
+ final int NUM_INSTANCES = 2;
+ String jobName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+ Map<String, String> taskConfigMap =
+ Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
+ + START_PORT));
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+ taskConfigs.add(taskConfig1);
+ workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+ workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+ workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
+ + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
+ Map<String, String> jobConfigMap = Maps.newHashMap();
+ jobConfigMap.put("Timeout", "1000");
+ workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ _driver.start(workflowBuilder.build());
+
+ // Ensure the job completes
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+ // Ensure that the class was invoked
+ Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+
+ // Ensure that this was tried on two different instances, the first of which exhausted the
+ // attempts number, and the other passes on the first try
+ Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
+ Assert.assertTrue(_runCounts.values().contains(
+ JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+ Assert.assertTrue(_runCounts.values().contains(1));
+ }
+
private class TaskOne extends ReindexTask {
private final boolean _shouldFail;
+ private final String _instanceName;
- public TaskOne(TaskCallbackContext context) {
+ public TaskOne(TaskCallbackContext context, String instanceName) {
super(context);
// Check whether or not this task should succeed
@@ -220,15 +259,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Map<String, String> configMap = taskConfig.getConfigMap();
if (configMap != null && configMap.containsKey("fail")
&& Boolean.parseBoolean(configMap.get("fail"))) {
- shouldFail = true;
+ // if a specific instance is specified, only fail for that one
+ shouldFail =
+ !configMap.containsKey("failInstance")
+ || configMap.get("failInstance").equals(instanceName);
}
}
_shouldFail = shouldFail;
+
+ // Initialize the count for this instance if not already done
+ if (!_runCounts.containsKey(instanceName)) {
+ _runCounts.put(instanceName, 0);
+ }
+ _instanceName = instanceName;
}
@Override
public TaskResult run() {
_invokedClasses.add(getClass().getName());
+ _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
// Fail the task if it should fail
if (_shouldFail) {
@@ -240,8 +289,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
}
private class TaskTwo extends TaskOne {
- public TaskTwo(TaskCallbackContext context) {
- super(context);
+ public TaskTwo(TaskCallbackContext context, String instanceName) {
+ super(context, instanceName);
}
}
}