You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/22 20:09:19 UTC
[1/2] git commit: [HELIX-439] Support thresholding for job
success/failure
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x f3b2c4f66 -> 4ff7e3888
[HELIX-439] Support thresholding for job success/failure
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/efed8848
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/efed8848
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/efed8848
Branch: refs/heads/helix-0.6.x
Commit: efed8848fee9eeda847336f3de93f7ce4458b6a4
Parents: e8ad448
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed May 21 16:10:17 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed May 21 16:10:17 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 27 ++++++-
.../java/org/apache/helix/task/TaskConfig.java | 36 +++++++--
.../org/apache/helix/task/TaskRebalancer.java | 47 +++++++++---
.../java/org/apache/helix/task/Workflow.java | 2 +
.../org/apache/helix/task/beans/JobBean.java | 1 +
.../org/apache/helix/task/beans/TaskBean.java | 1 +
.../task/TestIndependentTaskRebalancer.java | 81 +++++++++++++++++++-
7 files changed, 174 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 90e3cfc..b166da1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -63,6 +63,8 @@ public class JobConfig {
public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
/** The number of concurrent tasks that are allowed to run on an instance. */
public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+ /** The number of tasks within the job that are allowed to fail. */
+ public static final String FAILURE_THRESHOLD = "FailureThreshold";
/** The individual task configurations, if any **/
public static final String TASK_CONFIGS = "TaskConfigs";
@@ -72,6 +74,7 @@ public class JobConfig {
public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+ public static final int DEFAULT_FAILURE_THRESHOLD = 0;
private final String _workflow;
private final String _targetResource;
@@ -82,12 +85,13 @@ public class JobConfig {
private final long _timeoutPerTask;
private final int _numConcurrentTasksPerInstance;
private final int _maxAttemptsPerTask;
+ private final int _failureThreshold;
private final Map<String, TaskConfig> _taskConfigMap;
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
- Map<String, TaskConfig> taskConfigMap) {
+ int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -97,6 +101,7 @@ public class JobConfig {
_timeoutPerTask = timeoutPerTask;
_numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
_maxAttemptsPerTask = maxAttemptsPerTask;
+ _failureThreshold = failureThreshold;
if (taskConfigMap != null) {
_taskConfigMap = taskConfigMap;
} else {
@@ -140,6 +145,10 @@ public class JobConfig {
return _maxAttemptsPerTask;
}
+ public int getFailureThreshold() {
+ return _failureThreshold;
+ }
+
public Map<String, TaskConfig> getTaskConfigMap() {
return _taskConfigMap;
}
@@ -171,6 +180,7 @@ public class JobConfig {
}
cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+ cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
return cfgMap;
}
@@ -188,13 +198,14 @@ public class JobConfig {
private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+ private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
public JobConfig build() {
validate();
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
- _maxAttemptsPerTask, _taskConfigMap);
+ _maxAttemptsPerTask, _failureThreshold, _taskConfigMap);
}
/**
@@ -235,6 +246,9 @@ public class JobConfig {
if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
}
+ if (cfg.containsKey(FAILURE_THRESHOLD)) {
+ b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+ }
return b;
}
@@ -283,6 +297,11 @@ public class JobConfig {
return this;
}
+ public Builder setFailureThreshold(int v) {
+ _failureThreshold = v;
+ return this;
+ }
+
public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
if (taskConfigs != null) {
for (TaskConfig taskConfig : taskConfigs) {
@@ -321,6 +340,10 @@ public class JobConfig {
throw new IllegalArgumentException(String.format("%s has invalid value %s",
MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
}
+ if (_failureThreshold < 0) {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ FAILURE_THRESHOLD, _failureThreshold));
+ }
if (_workflow == null) {
throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 547ba48..4ddab1a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -35,7 +35,8 @@ import com.google.common.collect.Maps;
public class TaskConfig {
private enum TaskConfigFields {
TASK_ID,
- TASK_COMMAND
+ TASK_COMMAND,
+ TASK_SUCCESS_OPTIONAL
}
private static final Logger LOG = Logger.getLogger(TaskConfig.class);
@@ -46,9 +47,12 @@ public class TaskConfig {
* Instantiate the task config
* @param command the command to invoke for the task
* @param configMap configuration to be passed as part of the invocation
+ * @param successOptional true if this task need not pass for the job to succeed, false
+ * otherwise
* @param id existing task ID
*/
- public TaskConfig(String command, Map<String, String> configMap, String id) {
+ public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
+ String id) {
if (configMap == null) {
configMap = Maps.newHashMap();
}
@@ -56,6 +60,8 @@ public class TaskConfig {
id = UUID.randomUUID().toString();
}
configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+ configMap.put(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString(),
+ Boolean.toString(successOptional));
configMap.put(TaskConfigFields.TASK_ID.toString(), id);
_configMap = configMap;
}
@@ -64,9 +70,11 @@ public class TaskConfig {
* Instantiate the task config
* @param command the command to invoke for the task
* @param configMap configuration to be passed as part of the invocation
+ * @param successOptional true if this task need not pass for the job to succeed, false
+ * otherwise
*/
- public TaskConfig(String command, Map<String, String> configMap) {
- this(command, configMap, null);
+ public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
+ this(command, configMap, successOptional, null);
}
/**
@@ -86,6 +94,19 @@ public class TaskConfig {
}
/**
+ * Check if this task must succeed for a job to succeed
+ * @return true if success is optional, false otherwise
+ */
+ public boolean isSuccessOptional() {
+ String successOptionalStr = _configMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+ if (successOptionalStr == null) {
+ return false;
+ } else {
+ return Boolean.parseBoolean(successOptionalStr);
+ }
+ }
+
+ /**
* Get the configuration map for this task's command
* @return map of configuration key to value
*/
@@ -110,7 +131,7 @@ public class TaskConfig {
* @return instantiated TaskConfig
*/
public static TaskConfig from(TaskBean bean) {
- return new TaskConfig(bean.command, bean.taskConfigMap);
+ return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
}
/**
@@ -121,6 +142,9 @@ public class TaskConfig {
public static TaskConfig from(Map<String, String> rawConfigMap) {
String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
- return new TaskConfig(command, rawConfigMap, taskId);
+ String successOptionalStr = rawConfigMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+ boolean successOptional =
+ (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : null;
+ return new TaskConfig(command, rawConfigMap, successOptional, taskId);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 457f0e0..849f339 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -202,6 +202,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Used to keep track of tasks that have already been assigned to instances.
Set<Integer> assignedPartitions = new HashSet<Integer>();
+ // Used to keep track of tasks that have failed, but whose failure is acceptable
+ Set<Integer> skippedPartitions = new HashSet<Integer>();
+
// Keeps a mapping of (partition) -> (instance, state)
Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
@@ -223,8 +226,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
if (pendingState != null) {
// There is a pending state transition for this (partition, instance). Just copy forward
- // the state
- // assignment from the previous ideal state.
+ // the state assignment from the previous ideal state.
Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
if (stateMap != null) {
String prevState = stateMap.get(instance);
@@ -298,13 +300,34 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
pName, currState));
markPartitionError(jobCtx, pId, currState);
// The error policy is to fail the task as soon a single partition fails for a specified
- // maximum number of
- // attempts.
+ // maximum number of attempts.
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
- workflowCtx.setJobState(jobResource, TaskState.FAILED);
- workflowCtx.setWorkflowState(TaskState.FAILED);
- addAllPartitions(allPartitions, partitionsToDropFromIs);
- return emptyAssignment(jobResource);
+ // If the user does not require this task to succeed in order for the job to succeed,
+ // then we don't have to fail the job right now
+ boolean successOptional = false;
+ String taskId = jobCtx.getTaskIdForPartition(pId);
+ if (taskId != null) {
+ TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+ if (taskConfig != null) {
+ successOptional = taskConfig.isSuccessOptional();
+ }
+ }
+
+ // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+ // to fail the job immediately
+ if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
+ successOptional = true;
+ }
+
+ if (!successOptional) {
+ workflowCtx.setJobState(jobResource, TaskState.FAILED);
+ workflowCtx.setWorkflowState(TaskState.FAILED);
+ addAllPartitions(allPartitions, partitionsToDropFromIs);
+ return emptyAssignment(jobResource);
+ } else {
+ skippedPartitions.add(pId);
+ partitionsToDropFromIs.add(pId);
+ }
}
}
break;
@@ -326,7 +349,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
pSet.removeAll(donePartitions);
}
- if (isJobComplete(jobCtx, allPartitions)) {
+ if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.COMPLETED);
@@ -382,14 +405,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
* Checks if the job has completed.
* @param ctx The rebalancer context.
* @param allPartitions The set of partitions to check.
+ * @param skippedPartitions partitions that failed, but whose failure is acceptable
* @return true if all task partitions have been marked with status
* {@link TaskPartitionState#COMPLETED} in the rebalancer
* context, false otherwise.
*/
- private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions) {
+ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
+ Set<Integer> skippedPartitions) {
for (Integer pId : allPartitions) {
TaskPartitionState state = ctx.getPartitionState(pId);
- if (state != TaskPartitionState.COMPLETED) {
+ if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 5b27fb6..537f287 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -169,6 +169,8 @@ public class Workflow {
String.valueOf(job.numConcurrentTasksPerInstance));
builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
String.valueOf(job.timeoutPerPartition));
+ builder
+ .addConfig(job.name, JobConfig.FAILURE_THRESHOLD, String.valueOf(job.failureThreshold));
if (job.tasks != null) {
List<TaskConfig> taskConfigs = Lists.newArrayList();
for (TaskBean task : job.tasks) {
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 5e12f19..af5882c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -39,4 +39,5 @@ public class JobBean {
public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+ public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index eedccb5..97ecfc0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -29,4 +29,5 @@ import java.util.Map;
public class TaskBean {
public String command;
public Map<String, String> taskConfigMap;
+ public boolean successOptional = false;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b2928e6..7041db8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -40,6 +40,7 @@ import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskResult.Status;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.Workflow;
@@ -50,6 +51,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Sets;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -130,8 +132,63 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
- TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", null, true);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
+ taskConfigs.add(taskConfig1);
+ taskConfigs.add(taskConfig2);
+ workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+ workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+ Map<String, String> jobConfigMap = Maps.newHashMap();
+ jobConfigMap.put("Timeout", "1000");
+ workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ _driver.start(workflowBuilder.build());
+
+ // Ensure the job completes
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+ // Ensure that each class was invoked
+ Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+ Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+ }
+
+ @Test
+ public void testThresholdFailure() throws Exception {
+ // Create a job with two different tasks
+ String jobName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+ Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
+ taskConfigs.add(taskConfig1);
+ taskConfigs.add(taskConfig2);
+ workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+ workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+ workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
+ Map<String, String> jobConfigMap = Maps.newHashMap();
+ jobConfigMap.put("Timeout", "1000");
+ workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ _driver.start(workflowBuilder.build());
+
+ // Ensure the job completes
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+ // Ensure that each class was invoked
+ Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+ Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+ }
+
+ @Test
+ public void testOptionalTaskFailure() throws Exception {
+ // Create a job with two different tasks
+ String jobName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+ Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, true);
+ TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
workflowBuilder.addTaskConfigs(jobName, taskConfigs);
@@ -151,13 +208,33 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
}
private class TaskOne extends ReindexTask {
+ private final boolean _shouldFail;
+
public TaskOne(TaskCallbackContext context) {
super(context);
+
+ // Check whether or not this task should succeed
+ TaskConfig taskConfig = context.getTaskConfig();
+ boolean shouldFail = false;
+ if (taskConfig != null) {
+ Map<String, String> configMap = taskConfig.getConfigMap();
+ if (configMap != null && configMap.containsKey("fail")
+ && Boolean.parseBoolean(configMap.get("fail"))) {
+ shouldFail = true;
+ }
+ }
+ _shouldFail = shouldFail;
}
@Override
public TaskResult run() {
_invokedClasses.add(getClass().getName());
+
+ // Fail the task if it should fail
+ if (_shouldFail) {
+ return new TaskResult(Status.ERROR, null);
+ }
+
return super.run();
}
}
[2/2] git commit: Merge branch 'helix-0.6.x' of
https://git-wip-us.apache.org/repos/asf/helix into helix-0.6.x
Posted by ka...@apache.org.
Merge branch 'helix-0.6.x' of https://git-wip-us.apache.org/repos/asf/helix into helix-0.6.x
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4ff7e388
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4ff7e388
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4ff7e388
Branch: refs/heads/helix-0.6.x
Commit: 4ff7e3888561b46ca6d9c590bdf425b0d2e0716b
Parents: efed884 f3b2c4f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu May 22 11:08:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu May 22 11:08:01 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/resources/JsonParameters.java | 4 +
.../webapp/resources/ResourceGroupResource.java | 7 +
.../helix/webapp/TestDisableResource.java | 84 ++++++
helix-core/pom.xml | 12 +-
.../main/java/org/apache/helix/GroupCommit.java | 41 +--
.../main/java/org/apache/helix/HelixAdmin.java | 7 +
.../main/java/org/apache/helix/PropertyKey.java | 27 ++
.../java/org/apache/helix/PropertyType.java | 12 -
.../helix/controller/HelixControllerMain.java | 19 --
.../controller/rebalancer/AutoRebalancer.java | 3 +-
.../controller/rebalancer/CustomRebalancer.java | 17 +-
.../rebalancer/SemiAutoRebalancer.java | 3 +-
.../util/ConstraintBasedAssignment.java | 20 +-
.../restlet/ZKPropertyTransferServer.java | 248 -----------------
.../controller/restlet/ZNRecordUpdate.java | 83 ------
.../restlet/ZNRecordUpdateResource.java | 77 ------
.../restlet/ZkPropertyTransferApplication.java | 45 ----
.../restlet/ZkPropertyTransferClient.java | 177 ------------
.../helix/controller/restlet/package-info.java | 23 --
.../stages/BestPossibleStateCalcStage.java | 10 +-
.../stages/BestPossibleStateOutput.java | 39 ++-
.../controller/stages/ClusterDataCache.java | 45 +---
.../stages/ExternalViewComputeStage.java | 14 +-
.../stages/ResourceComputationStage.java | 5 +
.../manager/zk/DistributedLeaderElection.java | 10 -
.../manager/zk/ParticipantManagerHelper.java | 3 -
.../apache/helix/manager/zk/ZKHelixAdmin.java | 84 ++++--
.../helix/manager/zk/ZKHelixDataAccessor.java | 73 +----
.../apache/helix/manager/zk/ZKHelixManager.java | 14 +-
.../manager/zk/ZNRecordStreamingSerializer.java | 6 +-
.../messaging/handling/HelixTaskExecutor.java | 117 ++++----
.../java/org/apache/helix/model/IdealState.java | 20 +-
.../helix/monitoring/ZKPathDataDumpTask.java | 173 ++++++------
.../monitoring/mbeans/ClusterStatusMonitor.java | 174 ++++++++++--
.../monitoring/mbeans/InstanceMonitor.java | 4 +-
.../mbeans/PerInstanceResourceMonitor.java | 146 ++++++++++
.../mbeans/PerInstanceResourceMonitorMBean.java | 34 +++
.../monitoring/mbeans/ResourceMonitor.java | 21 +-
.../participant/HelixCustomCodeRunner.java | 20 +-
.../org/apache/helix/tools/ClusterSetup.java | 17 +-
.../stages/TestMessageThrottleStage.java | 2 +-
.../strategy/TestAutoRebalanceStrategy.java | 2 +-
.../helix/integration/TestAutoRebalance.java | 2 +-
.../TestAutoRebalancePartitionLimit.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 2 +-
.../TestDisableCustomCodeRunner.java | 252 +++++++++++++++++
.../helix/integration/TestDisableNode.java | 2 +-
.../helix/integration/TestDisablePartition.java | 2 +-
.../helix/integration/TestDisableResource.java | 268 +++++++++++++++++++
.../org/apache/helix/integration/TestDrop.java | 125 ++++++++-
.../helix/integration/TestDropResource.java | 2 +-
.../integration/TestHelixCustomCodeRunner.java | 41 +--
.../helix/integration/TestMessagingService.java | 4 +-
.../helix/integration/TestSchedulerMessage.java | 22 +-
.../integration/TestZkCallbackHandlerLeak.java | 77 +++---
...dAloneCMTestBaseWithPropertyServerCheck.java | 88 ------
.../manager/MockParticipantManager.java | 1 -
.../manager/TestConsecutiveZkSessionExpiry.java | 4 +-
.../TestDistributedControllerManager.java | 4 +-
.../manager/TestZkCallbackHandlerLeak.java | 76 +++---
.../manager/zk/TestLiveInstanceBounce.java | 4 +-
.../zk/TestZKPropertyTransferServer.java | 65 -----
.../helix/manager/zk/TestZkHelixAdmin.java | 32 ++-
.../manager/zk/TestZkStateChangeListener.java | 4 +-
.../helix/mock/participant/MockJobIntf.java | 28 --
.../mock/participant/MockMSModelFactory.java | 2 -
.../TestClusterStatusMonitorLifecycle.java | 40 ++-
.../monitoring/TestParticipantMonitor.java | 6 +-
.../helix/monitoring/TestStatCollector.java | 6 +-
.../monitoring/TestZKPathDataDumpTask.java | 113 ++++++++
.../mbeans/TestClusterStatusMonitor.java | 210 +++++++++------
.../monitoring/mbeans/TestResourceMonitor.java | 127 +++------
.../apache/helix/tools/TestClusterSetup.java | 36 ++-
pom.xml | 5 +
74 files changed, 1973 insertions(+), 1621 deletions(-)
----------------------------------------------------------------------