You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/22 20:09:19 UTC

[1/2] git commit: [HELIX-439] Support thresholding for job success/failure

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x f3b2c4f66 -> 4ff7e3888


[HELIX-439] Support thresholding for job success/failure


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/efed8848
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/efed8848
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/efed8848

Branch: refs/heads/helix-0.6.x
Commit: efed8848fee9eeda847336f3de93f7ce4458b6a4
Parents: e8ad448
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed May 21 16:10:17 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed May 21 16:10:17 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 27 ++++++-
 .../java/org/apache/helix/task/TaskConfig.java  | 36 +++++++--
 .../org/apache/helix/task/TaskRebalancer.java   | 47 +++++++++---
 .../java/org/apache/helix/task/Workflow.java    |  2 +
 .../org/apache/helix/task/beans/JobBean.java    |  1 +
 .../org/apache/helix/task/beans/TaskBean.java   |  1 +
 .../task/TestIndependentTaskRebalancer.java     | 81 +++++++++++++++++++-
 7 files changed, 174 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 90e3cfc..b166da1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -63,6 +63,8 @@ public class JobConfig {
   public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
   /** The number of concurrent tasks that are allowed to run on an instance. */
   public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+  /** The number of tasks within the job that are allowed to fail. */
+  public static final String FAILURE_THRESHOLD = "FailureThreshold";
 
   /** The individual task configurations, if any **/
   public static final String TASK_CONFIGS = "TaskConfigs";
@@ -72,6 +74,7 @@ public class JobConfig {
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
   public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+  public static final int DEFAULT_FAILURE_THRESHOLD = 0;
 
   private final String _workflow;
   private final String _targetResource;
@@ -82,12 +85,13 @@ public class JobConfig {
   private final long _timeoutPerTask;
   private final int _numConcurrentTasksPerInstance;
   private final int _maxAttemptsPerTask;
+  private final int _failureThreshold;
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
-      Map<String, TaskConfig> taskConfigMap) {
+      int failureThreshold, Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -97,6 +101,7 @@ public class JobConfig {
     _timeoutPerTask = timeoutPerTask;
     _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
     _maxAttemptsPerTask = maxAttemptsPerTask;
+    _failureThreshold = failureThreshold;
     if (taskConfigMap != null) {
       _taskConfigMap = taskConfigMap;
     } else {
@@ -140,6 +145,10 @@ public class JobConfig {
     return _maxAttemptsPerTask;
   }
 
+  public int getFailureThreshold() {
+    return _failureThreshold;
+  }
+
   public Map<String, TaskConfig> getTaskConfigMap() {
     return _taskConfigMap;
   }
@@ -171,6 +180,7 @@ public class JobConfig {
     }
     cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
     cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+    cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
     return cfgMap;
   }
 
@@ -188,13 +198,14 @@ public class JobConfig {
     private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+    private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
 
     public JobConfig build() {
       validate();
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerTask, _taskConfigMap);
+          _maxAttemptsPerTask, _failureThreshold, _taskConfigMap);
     }
 
     /**
@@ -235,6 +246,9 @@ public class JobConfig {
       if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
         b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
       }
+      if (cfg.containsKey(FAILURE_THRESHOLD)) {
+        b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+      }
       return b;
     }
 
@@ -283,6 +297,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setFailureThreshold(int v) {
+      _failureThreshold = v;
+      return this;
+    }
+
     public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
       if (taskConfigs != null) {
         for (TaskConfig taskConfig : taskConfigs) {
@@ -321,6 +340,10 @@ public class JobConfig {
         throw new IllegalArgumentException(String.format("%s has invalid value %s",
             MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
       }
+      if (_failureThreshold < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            FAILURE_THRESHOLD, _failureThreshold));
+      }
       if (_workflow == null) {
         throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 547ba48..4ddab1a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -35,7 +35,8 @@ import com.google.common.collect.Maps;
 public class TaskConfig {
   private enum TaskConfigFields {
     TASK_ID,
-    TASK_COMMAND
+    TASK_COMMAND,
+    TASK_SUCCESS_OPTIONAL
   }
 
   private static final Logger LOG = Logger.getLogger(TaskConfig.class);
@@ -46,9 +47,12 @@ public class TaskConfig {
    * Instantiate the task config
    * @param command the command to invoke for the task
    * @param configMap configuration to be passed as part of the invocation
+   * @param successOptional true if this task need not pass for the job to succeed, false
+   *          otherwise
    * @param id existing task ID
    */
-  public TaskConfig(String command, Map<String, String> configMap, String id) {
+  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
+      String id) {
     if (configMap == null) {
       configMap = Maps.newHashMap();
     }
@@ -56,6 +60,8 @@ public class TaskConfig {
       id = UUID.randomUUID().toString();
     }
     configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    configMap.put(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString(),
+        Boolean.toString(successOptional));
     configMap.put(TaskConfigFields.TASK_ID.toString(), id);
     _configMap = configMap;
   }
@@ -64,9 +70,11 @@ public class TaskConfig {
    * Instantiate the task config
    * @param command the command to invoke for the task
    * @param configMap configuration to be passed as part of the invocation
+   * @param successOptional true if this task need not pass for the job to succeed, false
+   *          otherwise
    */
-  public TaskConfig(String command, Map<String, String> configMap) {
-    this(command, configMap, null);
+  public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
+    this(command, configMap, successOptional, null);
   }
 
   /**
@@ -86,6 +94,19 @@ public class TaskConfig {
   }
 
   /**
+   * Check if this task must succeed for a job to succeed
+   * @return true if success is optional, false otherwise
+   */
+  public boolean isSuccessOptional() {
+    String successOptionalStr = _configMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+    if (successOptionalStr == null) {
+      return false;
+    } else {
+      return Boolean.parseBoolean(successOptionalStr);
+    }
+  }
+
+  /**
    * Get the configuration map for this task's command
    * @return map of configuration key to value
    */
@@ -110,7 +131,7 @@ public class TaskConfig {
    * @return instantiated TaskConfig
    */
   public static TaskConfig from(TaskBean bean) {
-    return new TaskConfig(bean.command, bean.taskConfigMap);
+    return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
   }
 
   /**
@@ -121,6 +142,9 @@ public class TaskConfig {
   public static TaskConfig from(Map<String, String> rawConfigMap) {
     String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
     String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
-    return new TaskConfig(command, rawConfigMap, taskId);
+    String successOptionalStr = rawConfigMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+    boolean successOptional =
+        (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : null;
+    return new TaskConfig(command, rawConfigMap, successOptional, taskId);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 457f0e0..849f339 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -202,6 +202,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // Used to keep track of tasks that have already been assigned to instances.
     Set<Integer> assignedPartitions = new HashSet<Integer>();
 
+    // Used to keep track of tasks that have failed, but whose failure is acceptable
+    Set<Integer> skippedPartitions = new HashSet<Integer>();
+
     // Keeps a mapping of (partition) -> (instance, state)
     Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
 
@@ -223,8 +226,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
             currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
         if (pendingState != null) {
           // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state
-          // assignment from the previous ideal state.
+          // the state assignment from the previous ideal state.
           Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
           if (stateMap != null) {
             String prevState = stateMap.get(instance);
@@ -298,13 +300,34 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
               pName, currState));
           markPartitionError(jobCtx, pId, currState);
           // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of
-          // attempts.
+          // maximum number of attempts.
           if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
-            workflowCtx.setJobState(jobResource, TaskState.FAILED);
-            workflowCtx.setWorkflowState(TaskState.FAILED);
-            addAllPartitions(allPartitions, partitionsToDropFromIs);
-            return emptyAssignment(jobResource);
+            // If the user does not require this task to succeed in order for the job to succeed,
+            // then we don't have to fail the job right now
+            boolean successOptional = false;
+            String taskId = jobCtx.getTaskIdForPartition(pId);
+            if (taskId != null) {
+              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+              if (taskConfig != null) {
+                successOptional = taskConfig.isSuccessOptional();
+              }
+            }
+
+            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+            // to fail the job immediately
+            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
+              successOptional = true;
+            }
+
+            if (!successOptional) {
+              workflowCtx.setJobState(jobResource, TaskState.FAILED);
+              workflowCtx.setWorkflowState(TaskState.FAILED);
+              addAllPartitions(allPartitions, partitionsToDropFromIs);
+              return emptyAssignment(jobResource);
+            } else {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+            }
           }
         }
           break;
@@ -326,7 +349,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       pSet.removeAll(donePartitions);
     }
 
-    if (isJobComplete(jobCtx, allPartitions)) {
+    if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
       workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.COMPLETED);
@@ -382,14 +405,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
    * Checks if the job has completed.
    * @param ctx The rebalancer context.
    * @param allPartitions The set of partitions to check.
+   * @param skippedPartitions partitions that failed, but whose failure is acceptable
    * @return true if all task partitions have been marked with status
    *         {@link TaskPartitionState#COMPLETED} in the rebalancer
    *         context, false otherwise.
    */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions) {
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
+      Set<Integer> skippedPartitions) {
     for (Integer pId : allPartitions) {
       TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state != TaskPartitionState.COMPLETED) {
+      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 5b27fb6..537f287 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -169,6 +169,8 @@ public class Workflow {
           String.valueOf(job.numConcurrentTasksPerInstance));
       builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
           String.valueOf(job.timeoutPerPartition));
+      builder
+          .addConfig(job.name, JobConfig.FAILURE_THRESHOLD, String.valueOf(job.failureThreshold));
       if (job.tasks != null) {
         List<TaskConfig> taskConfigs = Lists.newArrayList();
         for (TaskBean task : job.tasks) {

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 5e12f19..af5882c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -39,4 +39,5 @@ public class JobBean {
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+  public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index eedccb5..97ecfc0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -29,4 +29,5 @@ import java.util.Map;
 public class TaskBean {
   public String command;
   public Map<String, String> taskConfigMap;
+  public boolean successOptional = false;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/efed8848/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b2928e6..7041db8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -40,6 +40,7 @@ import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskResult.Status;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.Workflow;
@@ -50,6 +51,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.collections.Sets;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -130,8 +132,63 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
-    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null, true);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
+    taskConfigs.add(taskConfig1);
+    taskConfigs.add(taskConfig2);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that each class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+  }
+
+  @Test
+  public void testThresholdFailure() throws Exception {
+    // Create a job with two different tasks
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
+    taskConfigs.add(taskConfig1);
+    taskConfigs.add(taskConfig2);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that each class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+  }
+
+  @Test
+  public void testOptionalTaskFailure() throws Exception {
+    // Create a job with two different tasks
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true));
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, true);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
     workflowBuilder.addTaskConfigs(jobName, taskConfigs);
@@ -151,13 +208,33 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
   }
 
   private class TaskOne extends ReindexTask {
+    private final boolean _shouldFail;
+
     public TaskOne(TaskCallbackContext context) {
       super(context);
+
+      // Check whether or not this task should succeed
+      TaskConfig taskConfig = context.getTaskConfig();
+      boolean shouldFail = false;
+      if (taskConfig != null) {
+        Map<String, String> configMap = taskConfig.getConfigMap();
+        if (configMap != null && configMap.containsKey("fail")
+            && Boolean.parseBoolean(configMap.get("fail"))) {
+          shouldFail = true;
+        }
+      }
+      _shouldFail = shouldFail;
     }
 
     @Override
     public TaskResult run() {
       _invokedClasses.add(getClass().getName());
+
+      // Fail the task if it should fail
+      if (_shouldFail) {
+        return new TaskResult(Status.ERROR, null);
+      }
+
       return super.run();
     }
   }


[2/2] git commit: Merge branch 'helix-0.6.x' of https://git-wip-us.apache.org/repos/asf/helix into helix-0.6.x

Posted by ka...@apache.org.
Merge branch 'helix-0.6.x' of https://git-wip-us.apache.org/repos/asf/helix into helix-0.6.x


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4ff7e388
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4ff7e388
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4ff7e388

Branch: refs/heads/helix-0.6.x
Commit: 4ff7e3888561b46ca6d9c590bdf425b0d2e0716b
Parents: efed884 f3b2c4f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu May 22 11:08:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu May 22 11:08:01 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/resources/JsonParameters.java  |   4 +
 .../webapp/resources/ResourceGroupResource.java |   7 +
 .../helix/webapp/TestDisableResource.java       |  84 ++++++
 helix-core/pom.xml                              |  12 +-
 .../main/java/org/apache/helix/GroupCommit.java |  41 +--
 .../main/java/org/apache/helix/HelixAdmin.java  |   7 +
 .../main/java/org/apache/helix/PropertyKey.java |  27 ++
 .../java/org/apache/helix/PropertyType.java     |  12 -
 .../helix/controller/HelixControllerMain.java   |  19 --
 .../controller/rebalancer/AutoRebalancer.java   |   3 +-
 .../controller/rebalancer/CustomRebalancer.java |  17 +-
 .../rebalancer/SemiAutoRebalancer.java          |   3 +-
 .../util/ConstraintBasedAssignment.java         |  20 +-
 .../restlet/ZKPropertyTransferServer.java       | 248 -----------------
 .../controller/restlet/ZNRecordUpdate.java      |  83 ------
 .../restlet/ZNRecordUpdateResource.java         |  77 ------
 .../restlet/ZkPropertyTransferApplication.java  |  45 ----
 .../restlet/ZkPropertyTransferClient.java       | 177 ------------
 .../helix/controller/restlet/package-info.java  |  23 --
 .../stages/BestPossibleStateCalcStage.java      |  10 +-
 .../stages/BestPossibleStateOutput.java         |  39 ++-
 .../controller/stages/ClusterDataCache.java     |  45 +---
 .../stages/ExternalViewComputeStage.java        |  14 +-
 .../stages/ResourceComputationStage.java        |   5 +
 .../manager/zk/DistributedLeaderElection.java   |  10 -
 .../manager/zk/ParticipantManagerHelper.java    |   3 -
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  84 ++++--
 .../helix/manager/zk/ZKHelixDataAccessor.java   |  73 +----
 .../apache/helix/manager/zk/ZKHelixManager.java |  14 +-
 .../manager/zk/ZNRecordStreamingSerializer.java |   6 +-
 .../messaging/handling/HelixTaskExecutor.java   | 117 ++++----
 .../java/org/apache/helix/model/IdealState.java |  20 +-
 .../helix/monitoring/ZKPathDataDumpTask.java    | 173 ++++++------
 .../monitoring/mbeans/ClusterStatusMonitor.java | 174 ++++++++++--
 .../monitoring/mbeans/InstanceMonitor.java      |   4 +-
 .../mbeans/PerInstanceResourceMonitor.java      | 146 ++++++++++
 .../mbeans/PerInstanceResourceMonitorMBean.java |  34 +++
 .../monitoring/mbeans/ResourceMonitor.java      |  21 +-
 .../participant/HelixCustomCodeRunner.java      |  20 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  17 +-
 .../stages/TestMessageThrottleStage.java        |   2 +-
 .../strategy/TestAutoRebalanceStrategy.java     |   2 +-
 .../helix/integration/TestAutoRebalance.java    |   2 +-
 .../TestAutoRebalancePartitionLimit.java        |   2 +-
 .../TestCustomizedIdealStateRebalancer.java     |   2 +-
 .../TestDisableCustomCodeRunner.java            | 252 +++++++++++++++++
 .../helix/integration/TestDisableNode.java      |   2 +-
 .../helix/integration/TestDisablePartition.java |   2 +-
 .../helix/integration/TestDisableResource.java  | 268 +++++++++++++++++++
 .../org/apache/helix/integration/TestDrop.java  | 125 ++++++++-
 .../helix/integration/TestDropResource.java     |   2 +-
 .../integration/TestHelixCustomCodeRunner.java  |  41 +--
 .../helix/integration/TestMessagingService.java |   4 +-
 .../helix/integration/TestSchedulerMessage.java |  22 +-
 .../integration/TestZkCallbackHandlerLeak.java  |  77 +++---
 ...dAloneCMTestBaseWithPropertyServerCheck.java |  88 ------
 .../manager/MockParticipantManager.java         |   1 -
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestZkCallbackHandlerLeak.java      |  76 +++---
 .../manager/zk/TestLiveInstanceBounce.java      |   4 +-
 .../zk/TestZKPropertyTransferServer.java        |  65 -----
 .../helix/manager/zk/TestZkHelixAdmin.java      |  32 ++-
 .../manager/zk/TestZkStateChangeListener.java   |   4 +-
 .../helix/mock/participant/MockJobIntf.java     |  28 --
 .../mock/participant/MockMSModelFactory.java    |   2 -
 .../TestClusterStatusMonitorLifecycle.java      |  40 ++-
 .../monitoring/TestParticipantMonitor.java      |   6 +-
 .../helix/monitoring/TestStatCollector.java     |   6 +-
 .../monitoring/TestZKPathDataDumpTask.java      | 113 ++++++++
 .../mbeans/TestClusterStatusMonitor.java        | 210 +++++++++------
 .../monitoring/mbeans/TestResourceMonitor.java  | 127 +++------
 .../apache/helix/tools/TestClusterSetup.java    |  36 ++-
 pom.xml                                         |   5 +
 74 files changed, 1973 insertions(+), 1621 deletions(-)
----------------------------------------------------------------------