You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/06/16 23:36:10 UTC

[1/4] helix git commit: [HELIX-618] Job hung if the target resource does not exist anymore at the time when it is scheduled.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 6b6bb8f60 -> fe540ac9e


[HELIX-618]  Job hung if the target resource does not exist anymore at the time when it is scheduled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fe540ac9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fe540ac9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fe540ac9

Branch: refs/heads/helix-0.6.x
Commit: fe540ac9ec93fb3fb1caa71acaede9c3a63e9fd4
Parents: d381a3a
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 16:34:31 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../FixedTargetTaskAssignmentCalculator.java    |  14 +-
 .../org/apache/helix/task/JobRebalancer.java    |  51 +++--
 .../org/apache/helix/task/TaskRebalancer.java   |  41 ++--
 .../apache/helix/task/WorkflowRebalancer.java   |  31 +--
 .../task/TestRunJobsWithMissingTarget.java      | 214 +++++++++++++++++++
 5 files changed, 306 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 8760524..60cd92f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.ResourceAssignment;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.log4j.Logger;
 
 /**
  * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target
@@ -44,6 +45,7 @@ import com.google.common.collect.Sets;
  * (if desired) only where those partitions are in a given state.
  */
 public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
+  private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
 
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
@@ -58,6 +60,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       Set<Integer> partitionSet, ClusterDataCache cache) {
     IdealState tgtIs = getTgtIdealState(jobCfg, cache);
     if (tgtIs == null) {
+      LOG.warn("Missing target resource for the scheduled job!");
       return Collections.emptyMap();
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
@@ -78,21 +81,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
 
   /**
    * Returns the set of all partition ids for a job.
-   * <p/>
    * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
    * use the list of all partition ids from the target resource.
+   * return empty set if target resource does not exist.
    */
   private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
       JobContext taskCtx) {
-    if (tgtResourceIs == null) {
-      return null;
-    }
     Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
     SortedSet<String> targetPartitions = Sets.newTreeSet();
     if (jobCfg.getTargetPartitions() != null) {
       targetPartitions.addAll(jobCfg.getTargetPartitions());
     } else {
-      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+      if (tgtResourceIs != null) {
+        targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+      } else {
+        LOG.warn("Missing target resource for the scheduled job!");
+      }
     }
 
     Set<Integer> taskPartitions = Sets.newTreeSet();

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0e2ab15..93d4689 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -206,9 +205,23 @@ public class JobRebalancer extends TaskRebalancer {
     TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
     Set<Integer> allPartitions =
         taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+
+    if (allPartitions == null || allPartitions.isEmpty()) {
+      // Empty target partitions, mark the job as FAILED.
+      LOG.warn(
+          "Missing task partition mapping for job " + jobResource + ", marked the job as FAILED!");
+      markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
+      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+      return new ResourceAssignment(jobResource);
+    }
+
     Map<String, SortedSet<Integer>> taskAssignments =
         getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
     long currentTime = System.currentTimeMillis();
+
+    LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments
+        + " excludedInstances: " + excludedInstances);
+
     for (String instance : taskAssignments.keySet()) {
       if (excludedInstances.contains(instance)) {
         continue;
@@ -322,13 +335,7 @@ public class JobRebalancer extends TaskRebalancer {
             }
 
             if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
+              markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
               markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
@@ -367,13 +374,7 @@ public class JobRebalancer extends TaskRebalancer {
     scheduleForNextTask(jobResource, jobCtx, currentTime);
 
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-
+      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
       // remove IdealState of this job
       TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }
@@ -428,6 +429,26 @@ public class JobRebalancer extends TaskRebalancer {
     return ra;
   }
 
+  private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.FAILED);
+    jobContext.setFinishTime(currentTime);
+    if (isWorkflowFinished(workflowContext, workflowConfig)) {
+      workflowContext.setFinishTime(currentTime);
+    }
+  }
+
+  private void markJobComplete(String jobName, JobContext jobContext,
+      WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.COMPLETED);
+    jobContext.setFinishTime(currentTime);
+    if (isWorkflowFinished(workflowContext, workflowConfig)) {
+      workflowContext.setFinishTime(currentTime);
+    }
+  }
+
   private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
     // Clear current entries if they exist and are expired
     long currentTime = now;

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1526883..f35ce69 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -62,22 +62,33 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       CurrentStateOutput currStateOutput);
 
   /**
-   * Checks if the workflow has completed.
+   * Checks if the workflow has finished (either completed or failed).
+   * Set the state in workflow context properly.
    *
    * @param ctx Workflow context containing job states
    * @param cfg Workflow config containing set of jobs
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED})
+   * or failed (any task is {@link TaskState#FAILED}, false otherwise.
    */
-  protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    if (!cfg.isTerminable()) {
-      return false;
-    }
+  protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) {
+    boolean incomplete = false;
     for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.COMPLETED) {
-        return false;
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState == TaskState.FAILED) {
+        ctx.setWorkflowState(TaskState.FAILED);
+        return true;
+      }
+      if (jobState != TaskState.COMPLETED) {
+        incomplete = true;
       }
     }
-    return true;
+
+    if (!incomplete && cfg.isTerminable()) {
+      ctx.setWorkflowState(TaskState.COMPLETED);
+      return true;
+    }
+
+    return false;
   }
 
   /**
@@ -124,6 +135,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       WorkflowContext workflowCtx) {
     int notStartedCount = 0;
     int inCompleteCount = 0;
+    int failedCount = 0;
 
     for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
       TaskState jobState = workflowCtx.getJobState(ancestor);
@@ -131,13 +143,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         ++notStartedCount;
       } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
         ++inCompleteCount;
+      } else if (jobState == TaskState.FAILED) {
+        ++failedCount;
       }
     }
 
-    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
-      LOG.debug(String
-          .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.",
-              job, notStartedCount, inCompleteCount));
+    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()
+        || failedCount > 0) {
+      LOG.debug(String.format(
+          "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d, failedParent(s)=%d.",
+          job, notStartedCount, inCompleteCount, failedCount));
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 912f501..db5426c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -77,17 +77,15 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     long currentTime = System.currentTimeMillis();
-    // Check if workflow is completed and mark it if it is completed.
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-      if (isWorkflowComplete(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
-      }
+    // Check if workflow has been finished and mark it if it is.
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
+        && isWorkflowFinished(workflowCtx, workflowCfg)) {
+      workflowCtx.setFinishTime(currentTime);
+      TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
 
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow " + workflow + " is completed.");
+      LOG.info("Workflow " + workflow + " is finished.");
       long expiryTime = workflowCfg.getExpiry();
       // Check if this workflow has been finished past its expiry.
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
@@ -162,10 +160,19 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     // Set up job resource based on partitions from target resource
     int numIndependentTasks = jobConfig.getTaskConfigMap().size();
-    int numPartitions = (numIndependentTasks > 0) ?
-        numIndependentTasks :
-        admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource())
-            .getPartitionSet().size();
+
+    int numPartitions = numIndependentTasks;
+    if (numPartitions == 0) {
+      IdealState targetIs =
+          admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
+      if (targetIs == null) {
+        LOG.warn("Target resource does not exist for job " + jobResource);
+        // do not need to fail here, the job will be marked as failure immediately when job starts running.
+      } else {
+        numPartitions = targetIs.getPartitionSet().size();
+      }
+    }
+
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
new file mode 100644
index 0000000..74a8610
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -0,0 +1,214 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class);
+  private static final int num_nodes = 5;
+  private static final int num_dbs = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+  private ClusterControllerManager _controller;
+  private ClusterSetup _setupTool;
+
+  private List<String> _test_dbs = new ArrayList<String>();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < num_nodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target dbs
+    for (int i = 0; i < num_dbs; i++) {
+      String db = "TestDB" + i;
+      _setupTool
+          .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
+              IdealState.RebalanceMode.FULL_AUTO.toString());
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+      _test_dbs.add(db);
+    }
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < num_nodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < num_nodes; i++) {
+      _participants[i].syncStop();
+    }
+    _manager.disconnect();
+  }
+
+  private JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+    cal.set(Calendar.MILLISECOND, 0);
+    cfgMap.put(WorkflowConfig.START_TIME,
+        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+  }
+
+  private JobQueue.Builder buildJobQueue(String jobQueueName) {
+    return buildJobQueue(jobQueueName, 0);
+  }
+
+  @Test public void testJobFailsWithMissingTarget() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+  }
+
+  @Test public void testJobFailsWithMissingTargetInRunning() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+  }
+}


[2/4] helix git commit: [HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.

Posted by lx...@apache.org.
[HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c490fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c490fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c490fa

Branch: refs/heads/helix-0.6.x
Commit: 79c490fab080494b68a1f52845c1e708b8881439
Parents: 2409601
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 15:59:37 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/model/ResourceConfig.java  |  61 +++++
 .../java/org/apache/helix/task/JobConfig.java   | 258 +++++++++++--------
 .../java/org/apache/helix/task/TaskDriver.java  |   1 +
 .../java/org/apache/helix/task/TaskUtil.java    |  21 +-
 .../java/org/apache/helix/task/Workflow.java    |  37 +--
 .../task/TestIndependentTaskRebalancer.java     |  93 +++----
 .../integration/task/TestRecurringJobQueue.java |   4 +-
 .../integration/task/TestTaskRebalancer.java    |  39 +--
 .../task/TestTaskRebalancerRetryLimit.java      |  18 +-
 .../task/TestTaskRebalancerStopResume.java      |  10 +-
 .../integration/task/WorkflowGenerator.java     |  56 ++--
 11 files changed, 354 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 98433f5..d58126d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -23,6 +23,9 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
 
+import java.util.Collections;
+import java.util.Map;
+
 /**
  * Resource configurations
  */
@@ -73,6 +76,64 @@ public class ResourceConfig extends HelixProperty {
         .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
   }
 
+  /**
+   * Put a set of simple configs.
+   *
+   * @param configsMap
+   */
+  public void putSimpleConfigs(Map<String, String> configsMap) {
+    getRecord().getSimpleFields().putAll(configsMap);
+  }
+
+  /**
+   * Get all simple configurations.
+   *
+   * @return all simple configurations.
+   */
+  public Map<String, String> getSimpleConfigs() {
+    return Collections.unmodifiableMap(getRecord().getSimpleFields());
+  }
+
+  /**
+   * Put a single simple config value.
+   *
+   * @param configKey
+   * @param configVal
+   */
+  public void putSimpleConfig(String configKey, String configVal) {
+    getRecord().getSimpleFields().put(configKey, configVal);
+  }
+
+  /**
+   * Get a single simple config value.
+   *
+   * @param configKey
+   * @return configuration value, or NULL if not exist.
+   */
+  public String getSimpleConfig(String configKey) {
+    return getRecord().getSimpleFields().get(configKey);
+  }
+
+  /**
+   * Put a single map config.
+   *
+   * @param configKey
+   * @param configValMap
+   */
+  public void putMapConfig(String configKey, Map<String, String> configValMap) {
+    getRecord().setMapField(configKey, configValMap);
+  }
+
+  /**
+   * Get a single map config.
+   *
+   * @param configKey
+   * @return configuration value map, or NULL if not exist.
+   */
+  public Map<String, String> getMapConfig(String configKey) {
+    return getRecord().getMapField(configKey);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ResourceConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index c7c2f38..37a2f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -36,49 +36,87 @@ import com.google.common.collect.Maps;
  * Provides a typed interface to job configurations.
  */
 public class JobConfig {
-  // // Property names ////
-
-  /** The name of the workflow to which the job belongs. */
-  public static final String WORKFLOW_ID = "WorkflowID";
-  /** The assignment strategy of this job */
-  public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
-  /** The name of the target resource. */
-  public static final String TARGET_RESOURCE = "TargetResource";
-  /**
-   * The set of the target partition states. The value must be a comma-separated list of partition
-   * states.
-   */
-  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+
   /**
-   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder.
    */
-  public static final String TARGET_PARTITIONS = "TargetPartitions";
-  /** The command that is to be run by participants in the case of identical tasks. */
-  public static final String COMMAND = "Command";
-  /** The command configuration to be used by the tasks. */
-  public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
-  /** The timeout for a task. */
-  public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
-  /** The maximum number of times the task rebalancer may attempt to execute a task. */
-  public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
-  /** The maximum number of times Helix will intentionally move a failing task */
-  public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
-  /** The number of concurrent tasks that are allowed to run on an instance. */
-  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-  /** The number of tasks within the job that are allowed to fail. */
-  public static final String FAILURE_THRESHOLD = "FailureThreshold";
-  /** The amount of time in ms to wait before retrying a task */
-  public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
-
-  /** The individual task configurations, if any **/
-  public static final String TASK_CONFIGS = "TaskConfigs";
-
-  /** Disable external view (not showing) for this job resource */
-  public static final String DISABLE_EXTERNALVIEW = "DisableExternalView";
-
-
-  // // Default property values ////
+  protected enum JobConfigProperty {
+    /**
+     * The name of the workflow to which the job belongs.
+     */
+    WORKFLOW_ID("WorkflowID"),
+    /**
+     * The assignment strategy of this job
+     */
+    ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+    /**
+     * The name of the target resource.
+     */
+    TARGET_RESOURCE("TargetResource"),
+    /**
+     * The set of the target partition states. The value must be a comma-separated list of partition
+     * states.
+     */
+    TARGET_PARTITION_STATES("TargetPartitionStates"),
+    /**
+     * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+     */
+    TARGET_PARTITIONS("TargetPartitions"),
+    /**
+     * The command that is to be run by participants in the case of identical tasks.
+     */
+    COMMAND("Command"),
+    /**
+     * The command configuration to be used by the tasks.
+     */
+    JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+    /**
+     * The timeout for a task.
+     */
+    TIMEOUT_PER_TASK("TimeoutPerPartition"),
+    /**
+     * The maximum number of times the task rebalancer may attempt to execute a task.
+     */
+    MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+    /**
+     * The maximum number of times Helix will intentionally move a failing task
+     */
+    MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+    /**
+     * The number of concurrent tasks that are allowed to run on an instance.
+     */
+    NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+    /**
+     * The number of tasks within the job that are allowed to fail.
+     */
+    FAILURE_THRESHOLD("FailureThreshold"),
+    /**
+     * The amount of time in ms to wait before retrying a task
+     */
+    TASK_RETRY_DELAY("TaskRetryDelay"),
+
+    /**
+     * The individual task configurations, if any *
+     */
+    TASK_CONFIGS("TaskConfigs"),
+
+    /**
+     * Disable external view (not showing) for this job resource
+     */
+    DISABLE_EXTERNALVIEW("DisableExternalView");
+
+    private final String _value;
+
+    private JobConfigProperty(String val) {
+      _value = val;
+    }
+
+    public String value() {
+      return _value;
+    }
+  }
 
+  //Default property values
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -106,8 +144,7 @@ public class JobConfig {
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
-      boolean disableExternalView,
-      Map<String, TaskConfig> taskConfigMap) {
+      boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -190,34 +227,39 @@ public class JobConfig {
 
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+    cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
     if (_command != null) {
-      cfgMap.put(JobConfig.COMMAND, _command);
+      cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
     }
     if (_jobCommandConfigMap != null) {
       String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
       if (serializedConfig != null) {
-        cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
+        cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
       }
     }
     if (_targetResource != null) {
-      cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+      cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
     }
     if (_targetPartitionStates != null) {
-      cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+      cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+          Joiner.on(",").join(_targetPartitionStates));
     }
     if (_targetPartitions != null) {
-      cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+      cfgMap
+          .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
     }
     if (_retryDelay > 0) {
-      cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+      cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
     }
-    cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
-    cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
-    cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
-    cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
-    cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
-    cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
+    cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
+    cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
+    cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+        "" + _maxForcedReassignmentsPerTask);
+    cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
+    cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+        Boolean.toString(_disableExternalView));
+    cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+        "" + _numConcurrentTasksPerInstance);
     return cfgMap;
   }
 
@@ -251,54 +293,58 @@ public class JobConfig {
 
     /**
      * Convenience method to build a {@link JobConfig} from a {@code Map&lt;String, String&gt;}.
+     *
      * @param cfg A map of property names to their string representations.
      * @return A {@link Builder}.
      */
     public static Builder fromMap(Map<String, String> cfg) {
       Builder b = new Builder();
-      if (cfg.containsKey(WORKFLOW_ID)) {
-        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
+        b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
       }
-      if (cfg.containsKey(TARGET_RESOURCE)) {
-        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
+        b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
       }
-      if (cfg.containsKey(TARGET_PARTITIONS)) {
-        b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
+        b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
       }
-      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
-        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
-            TARGET_PARTITION_STATES).split(","))));
+      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+        b.setTargetPartitionStates(new HashSet<String>(
+            Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
       }
-      if (cfg.containsKey(COMMAND)) {
-        b.setCommand(cfg.get(COMMAND));
+      if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
+        b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
       }
-      if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
-        Map<String, String> commandConfigMap =
-            TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+      if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+        Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
+            cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
-      if (cfg.containsKey(TIMEOUT_PER_TASK)) {
-        b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
       }
-      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
-        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
-            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+        b.setNumConcurrentTasksPerInstance(
+            Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
       }
-      if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
-        b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+        b.setMaxAttemptsPerTask(
+            Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
       }
-      if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
-        b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
-            .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+        b.setMaxForcedReassignmentsPerTask(
+            Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
       }
-      if (cfg.containsKey(FAILURE_THRESHOLD)) {
-        b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+      if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+        b.setFailureThreshold(
+            Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
       }
-      if (cfg.containsKey(TASK_RETRY_DELAY)) {
-        b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+      if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
+        b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
       }
-      if (cfg.containsKey(DISABLE_EXTERNALVIEW)) {
-        b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW)));
+      if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+        b.setDisableExternalView(
+            Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
       }
       return b;
     }
@@ -384,38 +430,46 @@ public class JobConfig {
 
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
       }
-      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
-          && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
-            TARGET_PARTITION_STATES));
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+          .isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
       }
       if (_taskConfigMap.isEmpty() && _command == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.COMMAND));
       }
       if (_timeoutPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            TIMEOUT_PER_TASK, _timeoutPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+                _timeoutPerTask));
       }
       if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+                _numConcurrentTasksPerInstance));
       }
       if (_maxAttemptsPerTask < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+                _maxAttemptsPerTask));
       }
       if (_maxForcedReassignmentsPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+                _maxForcedReassignmentsPerTask));
       }
       if (_failureThreshold < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            FAILURE_THRESHOLD, _failureThreshold));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+                _failureThreshold));
       }
       if (_workflow == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 9b64aec..c4986ee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -55,6 +55,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d804fab..524b889 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -402,10 +402,10 @@ public class TaskUtil {
     Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
     JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
     Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
 
     // Set the workflow expiry
-    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+    workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
 
     // Set the schedule, if applicable
     ScheduleConfig scheduleConfig;
@@ -415,7 +415,7 @@ public class TaskUtil {
       scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
     }
     if (scheduleConfig != null) {
-      builder.setScheduleConfig(scheduleConfig);
+      workflowBuilder.setScheduleConfig(scheduleConfig);
     }
 
     // Add each job back as long as the original exists
@@ -426,29 +426,30 @@ public class TaskUtil {
         String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
         HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
         Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
-        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
-          builder.addConfig(job, e.getKey(), e.getValue());
-        }
+
+        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
         Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
         List<TaskConfig> taskConfigs = Lists.newLinkedList();
         for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
           TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
           taskConfigs.add(taskConfig);
         }
-        builder.addTaskConfigs(job, taskConfigs);
+        jobCfgBuilder.addTaskConfigs(taskConfigs);
+        workflowBuilder.addJobConfig(job, jobCfgBuilder);
 
         // Add dag dependencies
         Set<String> children = parentsToChildren.get(namespacedJob);
         if (children != null) {
           for (String namespacedChild : children) {
             String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
-            builder.addParentChildDependency(job, child);
+            workflowBuilder.addParentChildDependency(job, child);
           }
         }
       }
     }
-    return builder.build();
+    return workflowBuilder.build();
   }
 
   private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 8ea2691..3a050c2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -128,7 +128,9 @@ public class Workflow {
     return parse(new StringReader(yaml));
   }
 
-  /** Helper function to parse workflow from a generic {@link Reader} */
+  /**
+   * Helper function to parse workflow from a generic {@link Reader}
+   */
   private static Workflow parse(Reader reader) throws Exception {
     Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
@@ -146,29 +148,32 @@ public class Workflow {
           }
         }
 
-        builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
-        builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
         if (job.jobConfigMap != null) {
           builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
         }
-        builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+            job.targetResource);
         if (job.targetPartitionStates != null) {
-          builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
               Joiner.on(",").join(job.targetPartitionStates));
         }
         if (job.targetPartitions != null) {
-          builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
               Joiner.on(",").join(job.targetPartitions));
         }
-        builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
             String.valueOf(job.maxAttemptsPerTask));
-        builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+        builder.addConfig(job.name,
+            JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
             String.valueOf(job.maxForcedReassignmentsPerTask));
-        builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+        builder.addConfig(job.name,
+            JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
             String.valueOf(job.numConcurrentTasksPerInstance));
-        builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
             String.valueOf(job.timeoutPerPartition));
-        builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
             String.valueOf(job.failureThreshold));
         if (job.tasks != null) {
           List<TaskConfig> taskConfigs = Lists.newArrayList();
@@ -242,7 +247,7 @@ public class Workflow {
       _expiry = -1;
     }
 
-    public Builder addConfig(String job, String key, String val) {
+    private Builder addConfig(String job, String key, String val) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_jobConfigs.containsKey(job)) {
@@ -252,8 +257,8 @@ public class Workflow {
       return this;
     }
 
-    public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
-      return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+    private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
+      return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
           TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
     }
 
@@ -268,7 +273,7 @@ public class Workflow {
       return this;
     }
 
-    public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+    private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_taskConfigs.containsKey(job)) {
@@ -322,7 +327,7 @@ public class Workflow {
     protected WorkflowConfig.Builder buildWorkflowConfig() {
       for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
-        _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
+        _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
       }
 
       WorkflowConfig.Builder builder;

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index a00a736..40c2485 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -140,8 +140,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _runCounts.clear();
   }
 
-  @Test
-  public void testDifferentTasks() throws Exception {
+  @Test public void testDifferentTasks() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -150,11 +149,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -166,8 +166,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testThresholdFailure() throws Exception {
+  @Test public void testThresholdFailure() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -177,12 +176,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
+            .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -194,8 +193,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testOptionalTaskFailure() throws Exception {
+  @Test public void testOptionalTaskFailure() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -205,11 +203,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -221,24 +222,23 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testReassignment() throws Exception {
+  @Test public void testReassignment() throws Exception {
     final int NUM_INSTANCES = 2;
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    Map<String, String> taskConfigMap =
-        Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
-            + START_PORT));
+    Map<String, String> taskConfigMap = Maps.newHashMap(
+        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
-        + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -251,8 +251,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     // Ensure that this was tried on two different instances, the first of which exhausted the
     // attempts number, and the other passes on the first try
     Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
-    Assert.assertTrue(_runCounts.values().contains(
-        JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+    Assert.assertTrue(
+        _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
     Assert.assertTrue(_runCounts.values().contains(1));
   }
 
@@ -264,11 +264,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, String> taskConfigMap = Maps.newHashMap();
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
     workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
     _driver.start(workflowBuilder.build());
@@ -295,11 +298,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, String> taskConfigMap = Maps.newHashMap();
     TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     SingleFailTask.hasFailed = false;
     _driver.start(workflowBuilder.build());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 79adcd5..da13ada 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -215,12 +215,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
+      JobConfig.Builder jobConfig =
           new JobConfig.Builder().setCommand("Reindex")
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuild.enqueueJob(jobName, job);
+      queueBuild.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f402b82..3352d1c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -163,9 +163,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     String jobName = "Expiry";
     long expiry = 1000;
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
-    Workflow flow =
-        WorkflowGenerator
-            .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
+
+    Workflow flow = WorkflowGenerator
+            .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
             .setExpiry(expiry).build();
 
     _driver.start(flow);
@@ -204,9 +206,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     final String jobResource = "basic" + jobCompletionTime;
     Map<String, String> commandConfig =
         ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            commandConfig).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // Wait for job completion
@@ -220,18 +225,20 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     }
   }
 
-  @Test
-  public void partitionSet() throws Exception {
+  @Test public void partitionSet() throws Exception {
     final String jobResource = "partitionSet";
     ImmutableList<String> targetPartitions =
         ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1)
+        .setTargetPartitions(targetPartitions);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
-            JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // wait for job completeness/timeout
@@ -268,13 +275,15 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     }
   }
 
-  @Test
-  public void timeouts() throws Exception {
+  @Test public void timeouts() throws Exception {
     final String jobResource = "timeouts";
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .setMaxAttemptsPerTask(2).setTimeoutPerTask(100);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
-            String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // Wait until the job reports failure.

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index b678d7e..efe90b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -125,18 +125,15 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  @Test
-  public void test() throws Exception {
+  @Test public void test() throws Exception {
     String jobResource = TestHelper.getTestMethodName();
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .setMaxAttemptsPerTask(2).setCommand("ErrorTask").setFailureThreshold(Integer.MAX_VALUE);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
-            String.valueOf(2)).build();
-    Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs();
-    for (Map<String, String> jobConfig : jobConfigs.values()) {
-      jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
-      jobConfig.put(JobConfig.COMMAND, "ErrorTask");
-    }
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
 
     _driver.start(flow);
 
@@ -151,7 +148,6 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
         Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
       }
     }
-
   }
 
   private static class ErrorTask implements Task {

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 8a44672..7437b72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -162,12 +162,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  @Test
-  public void stopAndResume() throws Exception {
+  @Test public void stopAndResume() throws Exception {
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+    JobConfig.Builder jobBuilder =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
-            commandConfig).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(JOB_RESOURCE, jobBuilder).build();
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index a414f5c..23c35af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -56,58 +56,34 @@ public class WorkflowGenerator {
     DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
-      String jobName, Map<String, String> commandConfig, String... cfgs) {
-    if (cfgs.length % 2 != 0) {
-      throw new IllegalArgumentException(
-          "Additional configs should have even number of keys and values");
-    }
-    Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG);
-    for (int i = 0; i < cfgs.length; i += 2) {
-      bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
-    }
-
-    return bldr;
+  private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
+  static {
+    JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+    DEFAULT_JOB_BUILDER = builder;
   }
 
   public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
-    return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+    return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
   }
 
   public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
-      Map<String, String> commandConfig, Map<String, String> config) {
-    Workflow.Builder builder = new Workflow.Builder(jobName);
-    for (String key : config.keySet()) {
-      builder.addConfig(jobName, key, config.get(key));
-    }
-    if (commandConfig != null) {
-      ObjectMapper mapper = new ObjectMapper();
-      try {
-        String serializedMap = mapper.writeValueAsString(commandConfig);
-        builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-      } catch (IOException e) {
-        LOG.error("Error serializing " + commandConfig, e);
-      }
-    }
-    return builder;
+      JobConfig.Builder jobBuilder) {
+    return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);
   }
 
   public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
     Workflow.Builder builder = new Workflow.Builder(workflowName);
     builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
 
-    for (String key : DEFAULT_JOB_CONFIG.keySet()) {
-      builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
-      builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
-    }
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
-      builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-      builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-    } catch (IOException e) {
-      LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
-    }
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+    builder.addJobConfig(JOB_NAME_1, jobBuilder);
+    builder.addJobConfig(JOB_NAME_2, jobBuilder);
+
     return builder;
   }
 }


[3/4] helix git commit: Clean up unit tests for task framework.

Posted by lx...@apache.org.
Clean up unit tests for task framework.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d381a3a1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d381a3a1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d381a3a1

Branch: refs/heads/helix-0.6.x
Commit: d381a3a1cc69d129388896907b9cc696811650c7
Parents: 79c490f
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 16:33:08 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../webapp/resources/TestJobQueuesResource.java |   7 +-
 .../helix/integration/task/DummyTask.java       |  72 ------
 .../apache/helix/integration/task/MockTask.java |  73 ++++++
 .../helix/integration/task/TaskTestUtil.java    | 253 +++++++++++++++++++
 .../task/TestIndependentTaskRebalancer.java     |  23 +-
 .../integration/task/TestRecurringJobQueue.java | 157 +++---------
 .../integration/task/TestTaskRebalancer.java    |  72 +-----
 .../task/TestTaskRebalancerFailover.java        |  12 +-
 .../task/TestTaskRebalancerParallel.java        |  60 +----
 .../task/TestTaskRebalancerRetryLimit.java      |   2 +-
 .../task/TestTaskRebalancerStopResume.java      | 154 ++++-------
 .../apache/helix/integration/task/TestUtil.java | 207 ---------------
 .../integration/task/WorkflowGenerator.java     |   4 +-
 13 files changed, 451 insertions(+), 645 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 9c2306a..5d8a93b 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -29,7 +29,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.DummyTask;
+import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.Task;
@@ -42,9 +42,6 @@ import org.apache.helix.task.beans.WorkflowBean;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.webapp.AdminTestBase;
 import org.apache.helix.webapp.AdminTestHelper;
-import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
-import org.apache.helix.webapp.resources.JsonParameters;
-import org.apache.helix.webapp.resources.ResourceUtil;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -81,7 +78,7 @@ public class TestJobQueuesResource extends AdminTestBase {
     taskFactoryReg.put("DummyTask", new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new DummyTask(context);
+        return new MockTask(context);
       }
     });
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
deleted file mode 100644
index b6054d0..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-
-public class DummyTask implements Task {
-  private static final String TIMEOUT_CONFIG = "Timeout";
-  private final long _delay;
-  private volatile boolean _canceled;
-
-  public DummyTask(TaskCallbackContext context) {
-    JobConfig jobCfg = context.getJobConfig();
-    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-    if (cfg == null) {
-      cfg = Collections.emptyMap();
-    }
-    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-  }
-
-  @Override
-  public TaskResult run() {
-    long expiry = System.currentTimeMillis() + _delay;
-    long timeLeft;
-    while (System.currentTimeMillis() < expiry) {
-      if (_canceled) {
-        timeLeft = expiry - System.currentTimeMillis();
-        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-            : timeLeft));
-      }
-      sleep(50);
-    }
-    timeLeft = expiry - System.currentTimeMillis();
-    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-  }
-
-  @Override
-  public void cancel() {
-    _canceled = true;
-  }
-
-  private static void sleep(long d) {
-    try {
-      Thread.sleep(d);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
new file mode 100644
index 0000000..71fa12d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class MockTask implements Task {
+  public static final String TASK_COMMAND = "Reindex";
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private final long _delay;
+  private volatile boolean _canceled;
+
+  public MockTask(TaskCallbackContext context) {
+    JobConfig jobCfg = context.getJobConfig();
+    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+    if (cfg == null) {
+      cfg = Collections.emptyMap();
+    }
+    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+  }
+
+  @Override
+  public TaskResult run() {
+    long expiry = System.currentTimeMillis() + _delay;
+    long timeLeft;
+    while (System.currentTimeMillis() < expiry) {
+      if (_canceled) {
+        timeLeft = expiry - System.currentTimeMillis();
+        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+            : timeLeft));
+      }
+      sleep(50);
+    }
+    timeLeft = expiry - System.currentTimeMillis();
+    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+  }
+
+  @Override
+  public void cancel() {
+    _canceled = true;
+  }
+
+  private static void sleep(long d) {
+    try {
+      Thread.sleep(d);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
new file mode 100644
index 0000000..c5dd099
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -0,0 +1,253 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+
+/**
+ * Static test utility methods.
+ */
+public class TaskTestUtil {
+  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
+
+  /**
+   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
+   * reached.
+   * If the task has not reached target state by then, an error is thrown
+   * @param workflowResource Resource to poll for completeness
+   * @throws InterruptedException
+   */
+  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+      TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
+        && System.currentTimeMillis() < st + _default_timeout);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+  /**
+   * poll for job until it is at either state in targetStates.
+   * @param manager
+   * @param workflowResource
+   * @param jobName
+   * @param targetStates
+   * @throws InterruptedException
+   */
+  public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
+      TaskState... targetStates) throws InterruptedException {
+    // Get workflow config
+    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
+    Assert.assertNotNull(wfCfg);
+    WorkflowContext ctx;
+    if (wfCfg.isRecurring()) {
+      // if it's recurring, need to reconstruct workflow and job name
+      do {
+        Thread.sleep(100);
+        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
+      Assert.assertNotNull(ctx);
+      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
+      jobName = jobName.substring(workflowResource.length() + 1);
+      workflowResource = ctx.getLastScheduledSingleWorkflow();
+      jobName = String.format("%s_%s", workflowResource, jobName);
+    }
+
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+    // Wait for state
+    long st = System.currentTimeMillis();
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    }
+    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
+        && System.currentTimeMillis() < st + _default_timeout);
+    Assert.assertNotNull(ctx);
+    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
+  }
+
+  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+      final String jobName) throws Exception {
+    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
+    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+        return ctx == null || ctx.getJobState(namespacedJobName) == null;
+      }
+    }, _default_timeout);
+    Assert.assertTrue(succeed);
+  }
+
+  public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
+      throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
+    Assert.assertNotNull(ctx);
+    return ctx;
+  }
+
+  // 1. Different jobs in a same work flow is in RUNNING at the same time
+  // 2. No two jobs in the same work flow is in RUNNING at the same instance
+  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
+      throws InterruptedException {
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
+    Assert.assertNotNull(workflowConfig);
+
+    WorkflowContext workflowContext = null;
+    while (workflowContext == null) {
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      Thread.sleep(100);
+    }
+
+    int maxRunningCount = 0;
+    boolean finished = false;
+
+    while (!finished) {
+      finished = true;
+      int runningCount = 0;
+
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        TaskState jobState = workflowContext.getJobState(jobName);
+        if (jobState == TaskState.IN_PROGRESS) {
+          ++runningCount;
+          finished = false;
+        }
+      }
+
+      if (runningCount > maxRunningCount ) {
+        maxRunningCount = runningCount;
+      }
+
+      List<JobContext> jobContextList = new ArrayList<JobContext>();
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
+        if (jobContext != null) {
+          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
+        }
+      }
+
+      Set<String> instances = new HashSet<String>();
+      for (JobContext jobContext : jobContextList) {
+        for (int partition : jobContext.getPartitionSet()) {
+          String instance = jobContext.getAssignedParticipant(partition);
+          TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
+
+          if (instance == null) {
+            continue;
+          }
+          if (taskPartitionState != TaskPartitionState.INIT &&
+              taskPartitionState != TaskPartitionState.RUNNING) {
+            continue;
+          }
+          if (instances.contains(instance)) {
+            return false;
+          }
+
+          TaskPartitionState state = jobContext.getPartitionState(partition);
+          if (state != TaskPartitionState.COMPLETED) {
+            instances.add(instance);
+          }
+        }
+      }
+
+      Thread.sleep(100);
+    }
+
+    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
+  }
+
+  public static Date getDateFromStartTime(String startTime)
+  {
+    int splitIndex = startTime.indexOf(':');
+    int hourOfDay = 0, minutes = 0;
+    try
+    {
+      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+    }
+    catch (NumberFormatException e)
+    {
+
+    }
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+    cal.set(Calendar.MINUTE, minutes);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTime();
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
+    cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+    cal.set(Calendar.MILLISECOND, 0);
+    cfgMap.put(WorkflowConfig.START_TIME,
+        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    //cfgMap.put(WorkflowConfig.START_TIME,
+    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
+    return buildRecurrentJobQueue(jobQueueName, 0);
+  }
+
+  public static boolean pollForParticipantParallelState() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 40c2485..ba8367e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -32,7 +32,6 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -158,8 +157,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -185,8 +184,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -214,8 +213,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -242,8 +241,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -277,7 +276,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -309,7 +308,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure completion
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure a single retry happened
     JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
@@ -317,7 +316,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
   }
 
-  private class TaskOne extends ReindexTask {
+  private class TaskOne extends MockTask {
     private final boolean _shouldFail;
     private final String _instanceName;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index da13ada..4e21ef7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -20,9 +20,6 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,11 +49,9 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -108,10 +103,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -162,46 +157,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  private Date getDateFromStartTime(String startTime)
-  {
-    int splitIndex = startTime.indexOf(':');
-    int hourOfDay = 0, minutes = 0;
-    try
-    {
-      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
-      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
-    }
-    catch (NumberFormatException e)
-    {
-
-    }
-    Calendar cal = Calendar.getInstance();
-    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
-    cal.set(Calendar.MINUTE, minutes);
-    cal.set(Calendar.SECOND, 0);
-    cal.set(Calendar.MILLISECOND, 0);
-    return cal.getTime();
-  }
 
-  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
-    cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
-    Calendar cal = Calendar.getInstance();
-    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
-    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
-    cal.set(Calendar.MILLISECOND, 0);
-    cfgMap.put(WorkflowConfig.START_TIME,
-        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
-    //cfgMap.put(WorkflowConfig.START_TIME,
-    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
-    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
-  }
-
-  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
-    return buildRecurrentJobQueue(jobQueueName, 0);
-  }
 
   @Test
   public void deleteRecreateRecurrentQueue() throws Exception {
@@ -209,14 +165,14 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -226,25 +182,25 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     _driver.start(queueBuild.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure job 1 is started before stop it
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
     _driver.delete(queueName);
     Thread.sleep(500);
 
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -255,17 +211,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.createQueue(queueBuilder.build());
 
 
-    wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure jobs are started and completed
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
 
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
   }
 
@@ -275,7 +231,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
@@ -285,7 +241,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -296,20 +252,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     }
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
+    TaskTestUtil
+        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+            TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -320,21 +277,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
         TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -350,7 +307,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -366,7 +323,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
 
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -378,7 +335,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       jobs.add(job);
@@ -395,12 +352,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -424,17 +381,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
 
-    JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
 
-    JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
 
-    JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job3 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
 
@@ -445,12 +402,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are completed
     String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
 
     Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
@@ -488,51 +445,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
-  }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(10L);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 3352d1c..787ebcc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -45,7 +44,6 @@ import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -98,10 +96,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -171,7 +169,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
             .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -185,7 +183,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for job to finish and expire
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
     Thread.sleep(expiry);
@@ -215,7 +213,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait for job completion
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -242,7 +240,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // wait for job completeness/timeout
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
     String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
@@ -267,11 +265,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     new TaskDriver(_manager).start(flow);
 
     // Wait until the workflow completes
-    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
     for (String task : flow.getJobConfigs().keySet()) {
-      TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
@@ -287,7 +285,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job reports failure.
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -314,10 +312,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Set<String> master = Sets.newHashSet("MASTER");
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     _driver.enqueueJob(queueName, "masterJob", job1);
     _driver.enqueueJob(queueName, "slaveJob", job2);
@@ -325,8 +323,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Ensure successful completion
     String namespacedJob1 = queueName + "_masterJob";
     String namespacedJob2 = queueName + "_slaveJob";
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
     JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
 
@@ -352,48 +350,4 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
   }
-
-  private static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index b8e1c09..6f1c48e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -82,10 +82,10 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("DummyTask", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new DummyTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -143,7 +143,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job =
-        new JobConfig.Builder().setCommand("DummyTask")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
@@ -151,7 +151,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
 
     // check all tasks completed on MASTER
     String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -178,9 +178,9 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job);
 
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
     _participants[0].syncStop();
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // tasks previously assigned to localhost_12918 should be re-scheduled on new master
     ctx = TaskUtil.getJobContext(_manager, namespacedJob2);

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 2ff8c56..5180a04 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -26,31 +26,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -58,9 +48,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
 public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
@@ -105,10 +92,10 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
 
       final long delay = (i + 1) * 1000L;
       Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-      taskFactoryReg.put("Reindex", new TaskFactory() {
+      taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
         @Override
         public Task createNewTask(TaskCallbackContext context) {
-          return new ReindexTask(delay);
+          return new MockTask(context);
         }
       });
 
@@ -164,7 +151,7 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
     List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
     for (String testDbName : testDbNames) {
       jobConfigBuilders.add(
-          new JobConfig.Builder().setCommand("Reindex").setTargetResource(testDbName)
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
               .setTargetPartitionStates(Collections.singleton("SLAVE")));
     }
 
@@ -172,45 +159,6 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
       _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
     }
 
-    Assert.assertTrue(TestUtil.pollForWorkflowParallelState(_manager, queueName));
-  }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(long delay) {
-      _delay = delay;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
+    Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_manager, queueName));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index efe90b0..d25ffc5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -138,7 +138,7 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < _p; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 7437b72..b67fa90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration.task;
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -41,7 +40,6 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -53,7 +51,6 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -62,7 +59,6 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.util.PathUtils;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -108,12 +104,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
-      }
-    });
+    taskFactoryReg
+        .put(MockTask.TASK_COMMAND, new TaskFactory() {
+          @Override public Task createNewTask(TaskCallbackContext context) {
+            return new MockTask(context);
+          }
+        });
 
     // start dummy participants
     for (int i = 0; i < n; i++) {
@@ -173,15 +169,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing job");
     _driver.stop(JOB_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
 
     LOG.info("Resuming job");
     _driver.resume(JOB_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
@@ -191,15 +187,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing workflow");
     _driver.stop(workflow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
 
     LOG.info("Resuming workflow");
     _driver.resume(workflow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
   }
 
   @Test
@@ -214,7 +210,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
@@ -222,32 +218,32 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
 
     // stop job1
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job2 is not started
     TimeUnit.MILLISECONDS.sleep(200);
     String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
-    TestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
 
     LOG.info("Resuming job-queue: " + queueName);
     _driver.resume(queueName);
 
     // Ensure successful completion
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
     JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
 
@@ -282,7 +278,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -294,13 +290,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
-    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -310,22 +306,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, queueName,
+    TaskTestUtil.pollForJobState(_manager, queueName,
         String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager,
-        queueName,
-        String.format("%s_%s", queueName, currentJobNames.get(1)),
-        TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -339,7 +333,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     // add job 3 back
     JobConfig.Builder job =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
     LOG.info("Enqueuing job: " + deletedJob2);
@@ -350,7 +344,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -398,7 +392,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -408,19 +402,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       currentJobNames.add(i, jobName);
     }
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil
+        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -431,20 +426,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -460,7 +455,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -489,10 +484,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     for (int i = 0; i < JOB_COUNTS; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+          .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+          .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       jobs.add(job);
       jobNames.add(targetPartition.toLowerCase() + "Job" + i);
     }
@@ -504,12 +498,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     }
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -537,7 +531,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // Enqueue 2 jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job1: " + job1Name);
@@ -545,17 +539,17 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job2: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
 
     String namespacedJob2 = String.format("%s_%s", queueName,  job2Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // Stop queue
     _driver.stop(queueName);
@@ -605,7 +599,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
   }
 
   private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
@@ -615,48 +609,4 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName));
   }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
deleted file mode 100644
index d40ac89..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-
-/**
- * Static test utility methods.
- */
-public class TestUtil {
-  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
-
-  /**
-   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
-   * reached.
-   * If the task has not reached target state by then, an error is thrown
-   * @param workflowResource Resource to poll for completeness
-   * @throws InterruptedException
-   */
-  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
-      TaskState state) throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
-    WorkflowContext ctx;
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
-        && System.currentTimeMillis() < st + _default_timeout);
-
-    Assert.assertNotNull(ctx);
-    Assert.assertEquals(ctx.getWorkflowState(), state);
-  }
-
-  /**
-   * poll for job until it is at either state in targetStates.
-   * @param manager
-   * @param workflowResource
-   * @param jobName
-   * @param targetStates
-   * @throws InterruptedException
-   */
-  public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
-      TaskState... targetStates) throws InterruptedException {
-    // Get workflow config
-    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
-    Assert.assertNotNull(wfCfg);
-    WorkflowContext ctx;
-    if (wfCfg.isRecurring()) {
-      // if it's recurring, need to reconstruct workflow and job name
-      do {
-        Thread.sleep(100);
-        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
-      Assert.assertNotNull(ctx);
-      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
-      jobName = jobName.substring(workflowResource.length() + 1);
-      workflowResource = ctx.getLastScheduledSingleWorkflow();
-      jobName = String.format("%s_%s", workflowResource, jobName);
-    }
-
-    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
-    // Wait for state
-    long st = System.currentTimeMillis();
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    }
-    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
-        && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx);
-    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
-  }
-
-  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
-      final String jobName) throws Exception {
-    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
-    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
-        return ctx == null || ctx.getJobState(namespacedJobName) == null;
-      }
-    }, _default_timeout);
-    Assert.assertTrue(succeed);
-  }
-
-  public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
-      throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
-    WorkflowContext ctx;
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx);
-    return ctx;
-  }
-
-  // 1. Different jobs in a same work flow is in RUNNING at the same time
-  // 2. No two jobs in the same work flow is in RUNNING at the same instance
-  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
-      throws InterruptedException {
-
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
-    Assert.assertNotNull(workflowConfig);
-
-    WorkflowContext workflowContext = null;
-    while (workflowContext == null) {
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
-      Thread.sleep(100);
-    }
-
-    int maxRunningCount = 0;
-    boolean finished = false;
-
-    while (!finished) {
-      finished = true;
-      int runningCount = 0;
-
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
-      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        TaskState jobState = workflowContext.getJobState(jobName);
-        if (jobState == TaskState.IN_PROGRESS) {
-          ++runningCount;
-          finished = false;
-        }
-      }
-
-      if (runningCount > maxRunningCount ) {
-        maxRunningCount = runningCount;
-      }
-
-      List<JobContext> jobContextList = new ArrayList<JobContext>();
-      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
-        if (jobContext != null) {
-          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
-        }
-      }
-
-      Set<String> instances = new HashSet<String>();
-      for (JobContext jobContext : jobContextList) {
-        for (int partition : jobContext.getPartitionSet()) {
-          String instance = jobContext.getAssignedParticipant(partition);
-          TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
-
-          if (instance == null) {
-            continue;
-          }
-          if (taskPartitionState != TaskPartitionState.INIT &&
-              taskPartitionState != TaskPartitionState.RUNNING) {
-            continue;
-          }
-          if (instances.contains(instance)) {
-            return false;
-          }
-
-          TaskPartitionState state = jobContext.getPartitionState(partition);
-          if (state != TaskPartitionState.COMPLETED) {
-            instances.add(instance);
-          }
-        }
-      }
-
-      Thread.sleep(100);
-    }
-
-    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
-  }
-
-  public static boolean pollForParticipantParallelState() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 23c35af..ce3a36a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
@@ -27,7 +26,6 @@ import java.util.TreeMap;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Workflow;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Convenience class for generating various test workflows
@@ -44,7 +42,7 @@ public class WorkflowGenerator {
     Map<String, String> tmpMap = new TreeMap<String, String>();
     tmpMap.put("TargetResource", DEFAULT_TGT_DB);
     tmpMap.put("TargetPartitionStates", "MASTER");
-    tmpMap.put("Command", "Reindex");
+    tmpMap.put("Command", MockTask.TASK_COMMAND);
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
     DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
   }


[4/4] helix git commit: [HELIX-622] Add new resource configuration option to allow resource to disable emmiting monitoring bean.

Posted by lx...@apache.org.
[HELIX-622] Add new resource configuration option to allow resource to disable emmiting monitoring bean.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24096019
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24096019
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24096019

Branch: refs/heads/helix-0.6.x
Commit: 24096019375e13c439ec6bfa83088ba7c25ffaf9
Parents: 6b6bb8f
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jan 8 17:14:00 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixConstants.java   |   5 +-
 .../main/java/org/apache/helix/PropertyKey.java |   6 +-
 .../stages/BestPossibleStateCalcStage.java      |   1 +
 .../controller/stages/ClusterDataCache.java     |  37 +++++--
 .../stages/ExternalViewComputeStage.java        |  20 ++--
 .../org/apache/helix/model/ResourceConfig.java  | 106 ++++++++++++++++++
 .../mbeans/TestDisableResourceMbean.java        | 109 +++++++++++++++++++
 7 files changed, 264 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 6b6287c..5318fa9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -44,8 +44,9 @@ public interface HelixConstants {
   }
 
   enum ClusterConfigType {
-    HELIX_DISABLE_PIPELINE_TRIGGERS
+    HELIX_DISABLE_PIPELINE_TRIGGERS,
+    DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO
   }
 
-  static final String DEFAULT_STATE_MODEL_FACTORY = "DEFAULT";
+  String DEFAULT_STATE_MODEL_FACTORY = "DEFAULT";
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 663e831..33355f1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -50,8 +50,10 @@ import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.tools.YAMLClusterSetup;
 import org.apache.log4j.Logger;
 
 /**
@@ -212,7 +214,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey resourceConfigs() {
-      return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfig.class,
           _clusterName, ConfigScopeProperty.RESOURCE.toString());
     }
 
@@ -222,7 +224,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey resourceConfig(String resourceName) {
-      return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfig.class,
           _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 9a9767e..f12b6e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.Map;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 7144077..fde4959 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
@@ -61,6 +62,8 @@ public class ClusterDataCache {
   Map<String, StateModelDefinition> _stateModelDefMap;
   Map<String, InstanceConfig> _instanceConfigMap;
   Map<String, InstanceConfig> _instanceConfigCacheMap;
+  Map<String, ResourceConfig> _resourceConfigMap;
+  Map<String, ResourceConfig> _resourceConfigCacheMap;
   Map<String, ClusterConstraints> _constraintMap;
   Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   Map<String, Map<String, Message>> _messageMap;
@@ -89,10 +92,15 @@ public class ClusterDataCache {
       _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      _resourceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
     }
     _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
     _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
     _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
+    _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+
+    _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
     if (LOG.isTraceEnabled()) {
       for (LiveInstance instance : _liveInstanceMap.values()) {
@@ -100,9 +108,6 @@ public class ClusterDataCache {
       }
     }
 
-    _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
-
     Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
     List<PropertyKey> newMessageKeys = Lists.newLinkedList();
     long purgeSum = 0;
@@ -216,10 +221,9 @@ public class ClusterDataCache {
     LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
 
     if (LOG.isDebugEnabled()) {
-      int numPaths =
-          _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size()
-              + _instanceConfigMap.size() + _constraintMap.size() + newMessageKeys.size()
-              + currentStateKeys.size();
+      int numPaths = _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size()
+          + _instanceConfigMap.size() + _resourceConfigMap.size() + _constraintMap.size()
+          + newMessageKeys.size() + currentStateKeys.size();
       LOG.debug("Paths read: " + numPaths);
     }
 
@@ -341,6 +345,24 @@ public class ClusterDataCache {
     return _instanceConfigMap;
   }
 
+  /**
+   * Returns the instance config map
+   *
+   * @return
+   */
+  public Map<String, ResourceConfig> getResourceConfigMap() {
+    return _resourceConfigMap;
+  }
+
+  /**
+   * Returns the instance config map
+   *
+   * @return
+   */
+  public ResourceConfig getResourceConfig(String resource) {
+    return _resourceConfigMap.get(resource);
+  }
+
   public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
     Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
     for (InstanceConfig instanceConfig : instanceConfigs) {
@@ -424,6 +446,7 @@ public class ClusterDataCache {
     sb.append("idealStateMap:" + _idealStateMap).append("\n");
     sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
     sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+    sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n");
     sb.append("messageMap:" + _messageMap).append("\n");
 
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 1455cd5..d83518d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,6 +44,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -104,20 +105,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         }
       }
       // Update cluster status monitor mbean
-      ClusterStatusMonitor clusterStatusMonitor =
-          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+      ClusterStatusMonitor clusterStatusMonitor = event.getAttribute("clusterStatusMonitor");
       IdealState idealState = cache._idealStateMap.get(resourceName);
-      if (idealState != null) {
-        if (clusterStatusMonitor != null
-            && !idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+      ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
+      if (idealState != null && (resourceConfig == null || !resourceConfig
+          .isMonitoringDisabled())) {
+        if (clusterStatusMonitor != null && !idealState.getStateModelDefRef()
+            .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
           StateModelDefinition stateModelDef =
               cache.getStateModelDef(idealState.getStateModelDefRef());
-          clusterStatusMonitor.setResourceStatus(view,
-              cache._idealStateMap.get(view.getResourceName()), stateModelDef);
+          clusterStatusMonitor
+              .setResourceStatus(view, cache._idealStateMap.get(view.getResourceName()),
+                  stateModelDef);
         }
       } else {
-        // Drop the metrics for the dropped resource
+        // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
         clusterStatusMonitor.unregisterResource(view.getResourceName());
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
new file mode 100644
index 0000000..98433f5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -0,0 +1,106 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+/**
+ * Resource configurations
+ */
+public class ResourceConfig extends HelixProperty {
+  /**
+   * Configurable characteristics of an instance
+   */
+  public enum ResourceConfigProperty {
+    MONITORING_DISABLED, // Resource-level config, do not create Mbean and report any status for the resource.
+  }
+
+  private static final Logger _logger = Logger.getLogger(ResourceConfig.class.getName());
+
+  /**
+   * Instantiate for a specific instance
+   *
+   * @param resourceId the instance identifier
+   */
+  public ResourceConfig(String resourceId) {
+    super(resourceId);
+  }
+
+  /**
+   * Instantiate with a pre-populated record
+   *
+   * @param record a ZNRecord corresponding to an instance configuration
+   */
+  public ResourceConfig(ZNRecord record) {
+    super(record);
+  }
+
+  /**
+   * Get the value of DisableMonitoring set.
+   *
+   * @return the MonitoringDisabled is true or false
+   */
+  public Boolean isMonitoringDisabled() {
+    return _record.getBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), false);
+  }
+
+  /**
+   * Set whether to disable monitoring for this resource.
+   *
+   * @param monitoringDisabled whether to disable monitoring for this resource.
+   */
+  public void setMonitoringDisabled(boolean monitoringDisabled) {
+    _record
+        .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ResourceConfig) {
+      ResourceConfig that = (ResourceConfig) obj;
+
+      if (this.getId().equals(that.getId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+
+  /**
+   * Get the name of this resource
+   *
+   * @return the instance name
+   */
+  public String getResourceName() {
+    return _record.getId();
+  }
+
+  @Override
+  public boolean isValid() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
new file mode 100644
index 0000000..6d67fc7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -0,0 +1,109 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class TestDisableResourceMbean extends ZkUnitTestBase {
+  private MBeanServerConnection _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+  @Test public void testDisableResourceMonitoring() throws Exception {
+    final int NUM_PARTICIPANTS = 2;
+    String clusterName = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        3, // resources
+        32, // partitions per resource
+        4, // number of nodes
+        1, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+        true); // do rebalance
+
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      participants[i] =
+          new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+      participants[i].syncStart();
+    }
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    HelixConfigScope resourceScope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+            .forCluster(clusterName).forResource("TestDB1").build();
+    configAccessor
+        .set(resourceScope, ResourceConfig.ResourceConfigProperty.MONITORING_DISABLED.name(),
+            "true");
+
+    resourceScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+        .forCluster(clusterName).forResource("TestDB2").build();
+    configAccessor
+        .set(resourceScope, ResourceConfig.ResourceConfigProperty.MONITORING_DISABLED.name(),
+            "false");
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    Thread.sleep(300);
+
+    // Verify the bean was created for TestDB0, but not for TestDB1.
+    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));
+    Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", clusterName)));
+    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", clusterName)));
+
+    controller.syncStop();
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private ObjectName getMbeanName(String resourceName, String clusterName)
+      throws MalformedObjectNameException {
+    String clusterBeanName =
+        String.format("%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, clusterName);
+    String resourceBeanName = String
+        .format("%s,%s=%s", clusterBeanName, ClusterStatusMonitor.RESOURCE_DN_KEY, resourceName);
+    return new ObjectName(
+        String.format("%s: %s", ClusterStatusMonitor.CLUSTER_STATUS_KEY, resourceBeanName));
+  }
+}