You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/20 23:33:30 UTC

[2/2] git commit: fixing compile errors

fixing compile errors


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/925b7e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/925b7e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/925b7e94

Branch: refs/heads/helix-yarn
Commit: 925b7e94e2c73d9cd133a109a87d4cc46ad3b94d
Parents: 1ec06f5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Nov 20 14:33:20 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 20 14:33:20 2013 -0800

----------------------------------------------------------------------
 .../controller/stages/ResourceCurrentState.java |  20 +
 .../org/apache/helix/task/TaskRebalancer.java   | 638 ++++++++-----------
 .../java/org/apache/helix/task/TaskUtil.java    | 124 ++--
 .../integration/task/TestTaskRebalancer.java    | 247 ++++---
 .../task/TestTaskRebalancerStopResume.java      | 150 ++---
 .../apache/helix/integration/task/TestUtil.java | 113 ++--
 .../autoscale/provider/ProviderRebalancer.java  | 102 +--
 .../apache/helix/autoscale/HelixYarnTest.java   |   1 +
 8 files changed, 632 insertions(+), 763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index f04afd0..f986765 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -47,6 +47,13 @@ public class ResourceCurrentState {
   private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
 
   /**
+   * map of resource-id to map of partition-id to map of participant-id to state
+   * represent requested messages for the participant
+   * TODO: this isn't populated
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _requestedStateMap;
+
+  /**
    * map of resource-id to state model definition id
    */
   private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
@@ -62,6 +69,7 @@ public class ResourceCurrentState {
   public ResourceCurrentState() {
     _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
     _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _requestedStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
     _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
     _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
 
@@ -225,6 +233,18 @@ public class ResourceCurrentState {
   }
 
   /**
+   * given (resource, partition, instance), returns toState
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public State getRequestedState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    return getState(_requestedStateMap, resourceId, partitionId, participantId);
+  }
+
+  /**
    * @param resourceId
    * @param partitionId
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 5664713..997fe3b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -3,10 +3,6 @@
  */
 package org.apache.helix.task;
 
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -16,45 +12,57 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Custom rebalancer implementation for the {@code Task} state model.
- *
  * @author Abe <as...@linkedin.com>
  * @version $Revision$
  */
-public class TaskRebalancer implements Rebalancer
-{
+public class TaskRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
   private HelixManager _manager;
 
   @Override
-  public void init(HelixManager manager)
-  {
+  public void init(HelixManager manager) {
     _manager = manager;
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(Resource resource,
-                                                   IdealState taskIs,
-                                                   CurrentStateOutput currStateOutput,
-                                                   ClusterDataCache clusterData)
-  {
-    final String resourceName = resource.getResourceName();
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    PartitionedRebalancerContext context =
+        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+    ResourceId resourceId = context.getResourceId();
+    String resourceName = resourceId.toString();
+
+    // get the ideal state
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+    IdealState taskIs = accessor.getProperty(propertyKey);
 
     // Fetch task configuration
     TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
@@ -65,33 +73,29 @@ public class TaskRebalancer implements Rebalancer
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
 
     // Initialize workflow context if needed
-    if (workflowCtx == null)
-    {
+    if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
       workflowCtx.setStartTime(System.currentTimeMillis());
     }
 
     // Check parent dependencies
-    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName))
-    {
-      if (workflowCtx.getTaskState(parent) == null || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED))
-      {
+    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
+      if (workflowCtx.getTaskState(parent) == null
+          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
         return emptyAssignment(resourceName);
       }
     }
 
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE)
-    {
+    if (targetState == TargetState.DELETE) {
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName);
     }
 
     // Check if this workflow has been finished past its expiry.
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis())
-    {
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
       markForDeletion(_manager, workflowResource);
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName);
@@ -99,48 +103,42 @@ public class TaskRebalancer implements Rebalancer
 
     // Fetch any existing context information from the property store.
     TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
-    if (taskCtx == null)
-    {
+    if (taskCtx == null) {
       taskCtx = new TaskContext(new ZNRecord("TaskContext"));
       taskCtx.setStartTime(System.currentTimeMillis());
     }
 
     // The task is already in a final state (completed/failed).
     if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
-            || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED)
-    {
+        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
       return emptyAssignment(resourceName);
     }
 
     ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null)
-    {
-      prevAssignment = new ResourceAssignment(resourceName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(resourceId);
     }
 
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of HELIX-230.
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
     Set<Integer> partitionsToDrop = new TreeSet<Integer>();
 
-    ResourceAssignment newAssignment = computeResourceMapping(resourceName,
-                                                              workflowCfg,
-                                                              taskCfg,
-                                                              prevAssignment,
-                                                              clusterData.getIdealState(taskCfg.getTargetResource()),
-                                                              clusterData.getLiveInstances().keySet(),
-                                                              currStateOutput,
-                                                              workflowCtx,
-                                                              taskCtx,
-                                                              partitionsToDrop);
-
-    if (!partitionsToDrop.isEmpty())
-    {
-      for (Integer pId : partitionsToDrop)
-      {
+    IdealState targetIs =
+        accessor.getProperty(accessor.keyBuilder().idealStates(taskCfg.getTargetResource()));
+    List<ParticipantId> liveParticipants =
+        Lists.newArrayList(cluster.getLiveParticipantMap().keySet());
+    List<String> rawLiveParticipants =
+        Lists.transform(liveParticipants, Functions.toStringFunction());
+    ResourceAssignment newAssignment =
+        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, targetIs,
+            rawLiveParticipants, currentState, workflowCtx, taskCtx, partitionsToDrop);
+
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
         taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
       }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
       accessor.setProperty(propertyKey, taskIs);
     }
 
@@ -153,30 +151,20 @@ public class TaskRebalancer implements Rebalancer
   }
 
   private static ResourceAssignment computeResourceMapping(String taskResource,
-                                                           WorkflowConfig workflowConfig,
-                                                           TaskConfig taskCfg,
-                                                           ResourceAssignment prevAssignment,
-                                                           IdealState tgtResourceIs,
-                                                           Iterable<String> liveInstances,
-                                                           CurrentStateOutput currStateOutput,
-                                                           WorkflowContext workflowCtx,
-                                                           TaskContext taskCtx,
-                                                           Set<Integer> partitionsToDropFromIs)
-  {
+      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
+      IdealState tgtResourceIs, Iterable<String> liveInstances,
+      ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, TaskContext taskCtx,
+      Set<Integer> partitionsToDropFromIs) {
     TargetState taskTgtState = workflowConfig.getTargetState();
 
     // Update running status in workflow context
-    if (taskTgtState == TargetState.STOP)
-    {
+    if (taskTgtState == TargetState.STOP) {
       workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
       // Workflow has been stopped if all tasks are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig))
-      {
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
       }
-    }
-    else
-    {
+    } else {
       workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
       // Workflow is in progress if any task is in progress
       workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -190,134 +178,121 @@ public class TaskRebalancer implements Rebalancer
 
     // Process all the current assignments of task partitions.
     Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
-    Map<String, SortedSet<Integer>> taskAssignments = getTaskPartitionAssignments(liveInstances,
-                                                                                  prevAssignment,
-                                                                                  allPartitions);
-    for (String instance : taskAssignments.keySet())
-    {
+    Map<String, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    for (String instance : taskAssignments.keySet()) {
       Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
       Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet)
-      {
+      for (int pId : pSet) {
         final String pName = pName(taskResource, pId);
 
         // Check for pending state transitions on this (partition, instance).
-        String pendingState = currStateOutput.getPendingState(taskResource,
-                                                              new Partition(pName),
-                                                              instance);
-        if (pendingState != null)
-        {
-          // There is a pending state transition for this (partition, instance). Just copy forward the state
+        State pendingState =
+            currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance));
+        if (pendingState != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state
           // assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-          if (stateMap != null)
-          {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
+          Map<ParticipantId, State> stateMap =
+              prevAssignment.getReplicaMap(PartitionId.from(pName));
+          if (stateMap != null) {
+            State prevState = stateMap.get(ParticipantId.from(instance));
+            paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
             assignedPartitions.add(pId);
-            LOG.debug(String.format("Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                                   pName,
-                                   instance,
-                                   prevState));
+            LOG.debug(String
+                .format(
+                    "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                    pName, instance, prevState));
           }
 
           continue;
         }
 
-        TaskPartitionState currState = TaskPartitionState.valueOf(currStateOutput.getCurrentState(taskResource,
-                                                                                                  new Partition(pName),
-                                                                                                  instance));
+        TaskPartitionState currState =
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(
+                ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance)).toString());
 
         // Process any requested state transitions.
-        String requestedStateStr = currStateOutput.getRequestedState(taskResource,
-                                                                     new Partition(pName),
-                                                                     instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty())
-        {
+        String requestedStateStr =
+            currStateOutput.getRequestedState(ResourceId.from(taskResource),
+                PartitionId.from(pName), ParticipantId.from(instance)).toString();
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
           TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState))
-          {
-            LOG.warn(String.format("Requested state %s is the same as the current state for instance %s.",
-                                   requestedState,
-                                   instance));
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
           }
 
           paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
           assignedPartitions.add(pId);
-          LOG.debug(String.format("Instance %s requested a state transition to %s for partition %s.",
-                                 instance,
-                                 requestedState,
-                                 pName));
+          LOG.debug(String.format(
+              "Instance %s requested a state transition to %s for partition %s.", instance,
+              requestedState, pName));
           continue;
         }
 
-        switch (currState)
-        {
-          case RUNNING:
-          case STOPPED:
-          {
-            TaskPartitionState nextState;
-            if (taskTgtState == TargetState.START)
-            {
-              nextState = TaskPartitionState.RUNNING;
-            }
-            else
-            {
-              nextState = TaskPartitionState.STOPPED;
-            }
-
-            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-            assignedPartitions.add(pId);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
-                                   pName,
-                                   nextState,
-                                   instance));
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (taskTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
           }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+              nextState, instance));
+        }
           break;
-          case COMPLETED:
-          {
-            // The task has completed on this partition. Mark as such in the context object.
-            donePartitions.add(pId);
-            LOG.debug(String.format("Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                                   pName,
-                                   currState));
-            partitionsToDropFromIs.add(pId);
-            markPartitionCompleted(taskCtx, pId);
-          }
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          LOG.debug(String
+              .format(
+                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                  pName, currState));
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(taskCtx, pId);
+        }
           break;
-          case TIMED_OUT:
-          case TASK_ERROR:
-          case ERROR:
-          {
-            donePartitions.add(pId); // The task may be rescheduled on a different instance.
-            LOG.debug(String.format("Task partition %s has error state %s. Marking as such in rebalancer context.",
-                                   pName,
-                                   currState));
-            markPartitionError(taskCtx, pId, currState);
-            // The error policy is to fail the task as soon a single partition fails for a specified maximum number of
-            // attempts.
-            if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition())
-            {
-              workflowCtx.setTaskState(taskResource, TaskState.FAILED);
-              workflowCtx.setWorkflowState(TaskState.FAILED);
-              addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
-              return emptyAssignment(taskResource);
-            }
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          LOG.debug(String.format(
+              "Task partition %s has error state %s. Marking as such in rebalancer context.",
+              pName, currState));
+          markPartitionError(taskCtx, pId, currState);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of
+          // attempts.
+          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
+            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+            workflowCtx.setWorkflowState(TaskState.FAILED);
+            addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
+            return emptyAssignment(taskResource);
           }
+        }
           break;
-          case INIT:
-          case DROPPED:
-          {
-            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-            donePartitions.add(pId);
-            LOG.debug(String.format("Task partition %s has state %s. It will be dropped from the current ideal state.",
-                                   pName,
-                                   currState));
-          }
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          LOG.debug(String.format(
+              "Task partition %s has state %s. It will be dropped from the current ideal state.",
+              pName, currState));
+        }
           break;
-          default:
-            throw new AssertionError("Unknown enum symbol: " + currState);
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
         }
       }
 
@@ -325,60 +300,50 @@ public class TaskRebalancer implements Rebalancer
       pSet.removeAll(donePartitions);
     }
 
-    if (isTaskComplete(taskCtx, allPartitions))
-    {
+    if (isTaskComplete(taskCtx, allPartitions)) {
       workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
-      if (isWorkflowComplete(workflowCtx, workflowConfig))
-      {
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.COMPLETED);
         workflowCtx.setFinishTime(System.currentTimeMillis());
       }
     }
 
     // Make additional task assignments if needed.
-    if (taskTgtState == TargetState.START)
-    {
-      // Contains the set of task partitions that must be excluded from consideration when making any new assignments.
+    if (taskTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
       // This includes all completed, failed, already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
       addCompletedPartitions(excludeSet, taskCtx, allPartitions);
       // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments = getTgtPartitionAssignment(currStateOutput,
-                                                                                          liveInstances,
-                                                                                          tgtResourceIs,
-                                                                                          taskCfg.getTargetPartitionStates(),
-                                                                                          allPartitions);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet())
-      {
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+          getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
+              taskCfg.getTargetPartitionStates(), allPartitions);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
         String instance = entry.getKey();
         // Contains the set of task partitions currently assigned to the instance.
         Set<Integer> pSet = entry.getValue();
         int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0)
-        {
-          List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
-                                                           excludeSet,
-                                                           numToAssign);
-          for (Integer pId : nextPartitions)
-          {
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
             String pName = pName(taskResource, pId);
             paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
             excludeSet.add(pId);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
-                                   pName,
-                                   TaskPartitionState.RUNNING,
-                                   instance));
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                TaskPartitionState.RUNNING, instance));
           }
         }
       }
     }
 
     // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(taskResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet())
-    {
+    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
       PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(taskResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state));
+      ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
+          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
     }
 
     return ra;
@@ -386,20 +351,16 @@ public class TaskRebalancer implements Rebalancer
 
   /**
    * Checks if the task has completed.
-   *
-   * @param ctx           The rebalancer context.
+   * @param ctx The rebalancer context.
    * @param allPartitions The set of partitions to check.
-   *
-   * @return true if all task partitions have been marked with status {@link TaskPartitionState#COMPLETED} in the rebalancer
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
    *         context, false otherwise.
    */
-  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions)
-  {
-    for (Integer pId : allPartitions)
-    {
+  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+    for (Integer pId : allPartitions) {
       TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state != TaskPartitionState.COMPLETED)
-      {
+      if (state != TaskPartitionState.COMPLETED) {
         return false;
       }
     }
@@ -408,18 +369,13 @@ public class TaskRebalancer implements Rebalancer
 
   /**
    * Checks if the workflow has completed.
-   *
    * @param ctx Workflow context containing task states
    * @param cfg Workflow config containing set of tasks
-   *
    * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
    */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg)
-  {
-    for (String task : cfg.getTaskDag().getAllNodes())
-    {
-      if(ctx.getTaskState(task) != TaskState.COMPLETED)
-      {
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
         return false;
       }
     }
@@ -428,139 +384,122 @@ public class TaskRebalancer implements Rebalancer
 
   /**
    * Checks if the workflow has been stopped.
-   *
    * @param ctx Workflow context containing task states
    * @param cfg Workflow config containing set of tasks
-   *
    * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
    */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg)
-  {
-    for (String task : cfg.getTaskDag().getAllNodes())
-    {
-      if(ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null)
-      {
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
         return false;
       }
     }
     return true;
   }
 
-  private static void markForDeletion(HelixManager mgr, String resourceName)
-  {
-    mgr.getConfigAccessor().set(TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-                                WorkflowConfig.TARGET_STATE,
-                                TargetState.DELETE.name());
+  private static void markForDeletion(HelixManager mgr, String resourceName) {
+    mgr.getConfigAccessor().set(
+        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
   }
 
   /**
-   * Cleans up all Helix state associated with this task, wiping workflow-level information if this is the last
+   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
+   * is the last
    * remaining task in its workflow.
    */
-  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg, String workflowResource)
-  {
+  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+      String workflowResource) {
     HelixDataAccessor accessor = mgr.getHelixDataAccessor();
     // Delete resource configs.
     PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey))
-    {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          cfgKey));
+    if (!accessor.removeProperty(cfgKey)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, cfgKey));
     }
     // Delete property store information for this resource.
     String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT))
-    {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          propStoreKey));
+    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, propStoreKey));
     }
     // Finally, delete the ideal state itself.
     PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey))
-    {
+    if (!accessor.removeProperty(isKey)) {
       throw new RuntimeException(String.format(
           "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName,
-          isKey));
+          resourceName, isKey));
     }
     LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
 
     boolean lastInWorkflow = true;
-    for(String task : cfg.getTaskDag().getAllNodes())
-    {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
       // check if property store information or resource configs exist for this task
-      if(mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task), AccessOption.PERSISTENT)
-              || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
-              || accessor.getProperty(getISPropertyKey(accessor, task)) != null)
-      {
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+          AccessOption.PERSISTENT)
+          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
         lastInWorkflow = false;
       }
     }
 
     // clean up task-level info if this was the last in workflow
-    if(lastInWorkflow)
-    {
+    if (lastInWorkflow) {
       // delete workflow config
       PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey))
-      {
-        throw new RuntimeException(String.format(
-                "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                workflowResource,
-                workflowCfgKey));
+      if (!accessor.removeProperty(workflowCfgKey)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowCfgKey));
       }
       // Delete property store information for this workflow
       String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT))
-      {
-        throw new RuntimeException(String.format(
-                "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                workflowResource,
-                workflowPropStoreKey));
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowPropStoreKey));
       }
     }
 
   }
 
-  private static String getRebalancerPropStoreKey(String resource)
-  {
+  private static String getRebalancerPropStoreKey(String resource) {
     return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
   }
 
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource)
-  {
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
     return accessor.keyBuilder().idealStates(resource);
   }
 
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource)
-  {
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
     return accessor.keyBuilder().resourceConfig(resource);
   }
 
-  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds)
-  {
-    for (String pName : pNames)
-    {
+  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
+    for (String pName : pNames) {
       pIds.add(pId(pName));
     }
   }
 
-  private static ResourceAssignment emptyAssignment(String name)
-  {
-    return new ResourceAssignment(name);
+  private static ResourceAssignment emptyAssignment(String name) {
+    return new ResourceAssignment(ResourceId.from(name));
   }
 
-  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx, Iterable<Integer> pIds)
-  {
-    for (Integer pId : pIds)
-    {
+  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
       TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED)
-      {
+      if (state == TaskPartitionState.COMPLETED) {
         set.add(pId);
       }
     }
@@ -569,23 +508,17 @@ public class TaskRebalancer implements Rebalancer
   /**
    * Returns the set of all partition ids for a task.
    * <p/>
-   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we use the list of all
-   * partition ids from the target resource.
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
    */
-  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg)
-  {
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
     Set<Integer> taskPartitions = new HashSet<Integer>();
-    if (taskCfg.getTargetPartitions() != null)
-    {
-      for (Integer pId : taskCfg.getTargetPartitions())
-      {
+    if (taskCfg.getTargetPartitions() != null) {
+      for (Integer pId : taskCfg.getTargetPartitions()) {
         taskPartitions.add(pId);
       }
-    }
-    else
-    {
-      for (String pName : tgtResourceIs.getPartitionSet())
-      {
+    } else {
+      for (String pName : tgtResourceIs.getPartitionSet()) {
         taskPartitions.add(pId(pName));
       }
     }
@@ -593,18 +526,15 @@ public class TaskRebalancer implements Rebalancer
     return taskPartitions;
   }
 
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, Set<Integer> excluded, int n)
-  {
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
     List<Integer> result = new ArrayList<Integer>(n);
-    for (Integer pId : candidatePartitions)
-    {
-      if (result.size() >= n)
-      {
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
         break;
       }
 
-      if (!excluded.contains(pId))
-      {
+      if (!excluded.contains(pId)) {
         result.add(pId);
       }
     }
@@ -612,15 +542,13 @@ public class TaskRebalancer implements Rebalancer
     return result;
   }
 
-  private static void markPartitionCompleted(TaskContext ctx, int pId)
-  {
+  private static void markPartitionCompleted(TaskContext ctx, int pId) {
     ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
     ctx.incrementNumAttempts(pId);
   }
 
-  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state)
-  {
+  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
     ctx.setPartitionState(pId, state);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
     ctx.incrementNumAttempts(pId);
@@ -628,38 +556,31 @@ public class TaskRebalancer implements Rebalancer
 
   /**
    * Get partition assignments for the target resource, but only for the partitions of interest.
-   *
    * @param currStateOutput The current state of the instances in the cluster.
-   * @param instanceList    The set of instances.
-   * @param tgtIs           The ideal state of the target resource.
-   * @param tgtStates       Only partitions in this set of states will be considered. If null, partitions do not need to
-   *                        be in any specific state to be considered.
-   * @param includeSet      The set of partitions to consider.
-   *
+   * @param instanceList The set of instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
    * @return A map of instance vs set of partition ids assigned to that instance.
    */
-  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(CurrentStateOutput currStateOutput,
-                                                                           Iterable<String> instanceList,
-                                                                           IdealState tgtIs,
-                                                                           Set<String> tgtStates,
-                                                                           Set<Integer> includeSet)
-  {
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+      ResourceCurrentState currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList)
-    {
+    for (String instance : instanceList) {
       result.put(instance, new TreeSet<Integer>());
     }
 
-    for (String pName : tgtIs.getPartitionSet())
-    {
+    for (String pName : tgtIs.getPartitionSet()) {
       int pId = pId(pName);
-      if (includeSet.contains(pId))
-      {
-        for (String instance : instanceList)
-        {
-          String state = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance);
-          if (tgtStates == null || tgtStates.contains(state))
-          {
+      if (includeSet.contains(pId)) {
+        for (String instance : instanceList) {
+          State state =
+              currStateOutput.getCurrentState(tgtIs.getResourceId(), PartitionId.from(pName),
+                  ParticipantId.from(instance));
+          if (tgtStates == null || tgtStates.contains(state.toString())) {
             result.get(instance).add(pId);
           }
         }
@@ -672,27 +593,20 @@ public class TaskRebalancer implements Rebalancer
   /**
    * Return the assignment of task partitions per instance.
    */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(Iterable<String> instanceList,
-                                                                             ResourceAssignment assignment,
-                                                                             Set<Integer> includeSet)
-  {
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList)
-    {
+    for (String instance : instanceList) {
       result.put(instance, new TreeSet<Integer>());
     }
 
-    for (Partition partition : assignment.getMappedPartitions())
-    {
-      int pId = pId(partition.getPartitionName());
-      if (includeSet.contains(pId))
-      {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet())
-        {
+    for (PartitionId partition : assignment.getMappedPartitionIds()) {
+      int pId = pId(partition.toString());
+      if (includeSet.contains(pId)) {
+        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
+        for (ParticipantId instance : replicaMap.keySet()) {
           SortedSet<Integer> pList = result.get(instance);
-          if (pList != null)
-          {
+          if (pList != null) {
             pList.add(pId);
           }
         }
@@ -705,16 +619,14 @@ public class TaskRebalancer implements Rebalancer
   /**
    * Computes the partition name given the resource name and partition id.
    */
-  private static String pName(String resource, int pId)
-  {
+  private static String pName(String resource, int pId) {
     return resource + "_" + pId;
   }
 
   /**
    * Extracts the partition id from the given partition name.
    */
-  private static int pId(String pName)
-  {
+  private static int pId(String pName) {
     String[] tokens = pName.split("_");
     return Integer.valueOf(tokens[tokens.length - 1]);
   }
@@ -722,13 +634,11 @@ public class TaskRebalancer implements Rebalancer
   /**
    * An (instance, state) pair.
    */
-  private static class PartitionAssignment
-  {
+  private static class PartitionAssignment {
     private final String _instance;
     private final String _state;
 
-    private PartitionAssignment(String instance, String state)
-    {
+    private PartitionAssignment(String instance, String state) {
       _instance = instance;
       _state = state;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d7b235e..741ed4d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -3,11 +3,10 @@
  */
 package org.apache.helix.task;
 
-
-import com.google.common.base.Joiner;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
@@ -20,139 +19,124 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
 
 /**
  * Static utility methods.
- *
- * @author Abe <as...@linkedin.com>
- * @version $Revision$
  */
-public class TaskUtil
-{
+public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
   private static final String CONTEXT_NODE = "Context";
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   /**
    * Parses task resource configurations in Helix into a {@link TaskConfig} object.
-   *
-   * @param manager      HelixManager object used to connect to Helix.
+   * @param manager HelixManager object used to connect to Helix.
    * @param taskResource The name of the task resource.
-   *
-   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null otherwise.
+   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+   *         otherwise.
    */
-  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource)
-  {
+  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
     Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
     TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
 
     return b.build();
   }
 
-  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource)
-  {
+  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
     Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
     WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
 
     return b.build();
   }
-  public static boolean setRequestedState(HelixDataAccessor accessor,
-                                          String instance,
-                                          String sessionId,
-                                          String resource,
-                                          String partition,
-                                          TaskPartitionState state)
-  {
-    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state, partition));
-    try
-    {
+
+  public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
+      String sessionId, String resource, String partition, TaskPartitionState state) {
+    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
+        partition));
+    try {
       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
       PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
       CurrentState currStateDelta = new CurrentState(resource);
       currStateDelta.setRequestedState(partition, state.name());
 
       return accessor.updateProperty(key, currStateDelta);
-    }
-    catch (Exception e)
-    {
-      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.", state, partition), e);
+    } catch (Exception e) {
+      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
+          state, partition), e);
       return false;
     }
   }
 
-  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource)
-  {
+  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
     return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
-          .forCluster(clusterName).forResource(resource).build();
+        .forCluster(clusterName).forResource(resource).build();
   }
 
-  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager, String resourceName)
-  {
-    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-                                                                         resourceName, PREV_RA_NODE), null, AccessOption.PERSISTENT);
+  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
+      String resourceName) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+            null, AccessOption.PERSISTENT);
     return r != null ? new ResourceAssignment(r) : null;
   }
 
-  public static void setPrevResourceAssignment(HelixManager manager, String resourceName, ResourceAssignment ra)
-  {
-    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-                                                            resourceName, PREV_RA_NODE), ra.getRecord(), AccessOption.PERSISTENT);
+  public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
+      ResourceAssignment ra) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+        ra.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static TaskContext getTaskContext(HelixManager manager, String taskResource)
-  {
-    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-                                                                         taskResource,
-                                                                         CONTEXT_NODE), null, AccessOption.PERSISTENT);
+  public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+            null, AccessOption.PERSISTENT);
     return r != null ? new TaskContext(r) : null;
   }
 
-  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx)
-  {
-    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-                                                            taskResource,
-                                                            CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+        ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource)
-  {
-    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-            workflowResource,
-            CONTEXT_NODE), null, AccessOption.PERSISTENT);
+  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+                CONTEXT_NODE), null, AccessOption.PERSISTENT);
     return r != null ? new WorkflowContext(r) : null;
   }
 
-  public static void setWorkflowContext(HelixManager manager, String workflowResource, WorkflowContext ctx)
-  {
-    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-            workflowResource,
-            CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+  public static void setWorkflowContext(HelixManager manager, String workflowResource,
+      WorkflowContext ctx) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
+        ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static String getNamespacedTaskName(String singleTaskWorkflow)
-  {
+  public static String getNamespacedTaskName(String singleTaskWorkflow) {
     return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
   }
 
-  public static String getNamespacedTaskName(String workflowResource, String taskName)
-  {
+  public static String getNamespacedTaskName(String workflowResource, String taskName) {
     return workflowResource + "_" + taskName;
   }
 
-  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource)
-  {
+  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
     HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
     ConfigAccessor configAccessor = manager.getConfigAccessor();
 
     Map<String, String> taskCfg = new HashMap<String, String>();
     List<String> cfgKeys = configAccessor.getKeys(scope);
-    if (cfgKeys == null || cfgKeys.isEmpty())
-    {
+    if (cfgKeys == null || cfgKeys.isEmpty()) {
       return null;
     }
 
-    for (String cfgKey : cfgKeys)
-    {
+    for (String cfgKey : cfgKeys) {
       taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index dec884b..7db753c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -3,17 +3,30 @@
  */
 package org.apache.helix.integration.task;
 
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.helix.*;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.task.*;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -21,142 +34,120 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 
 /**
  * @author Abe <as...@linkedin.com>
  * @version $Revision$
  */
-public class TestTaskRebalancer extends ZkIntegrationTestBase
-{
+public class TestTaskRebalancer extends ZkIntegrationTestBase {
   private static final int NUM_NODES = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String, TestHelper.StartCMResult>();
+  // private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String,
+  // TestHelper.StartCMResult>();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
+  ClusterControllerManager _controller;
   private HelixManager _manager;
   private TaskDriver _driver;
 
   @BeforeClass
-  public void beforeClass()
-      throws Exception
-  {
+  public void beforeClass() throws Exception {
     String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace))
-    {
+    if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);
     }
 
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < NUM_NODES; i++)
-    {
+    for (int i = 0; i < NUM_NODES; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
     // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL);
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory()
-    {
+    taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config)
-      {
+      public Task createNewTask(String config) {
         return new ReindexTask(config);
       }
     });
 
     // start dummy participants
-    for (int i = 0; i < NUM_NODES; i++)
-    {
+    for (int i = 0; i < NUM_NODES; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      TestHelper.StartCMResult result = TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
-      _startCMResultMap.put(instanceName, result);
+      // TestHelper.StartCMResult result =
+      _participants[i] =
+          TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
+      // _startCMResultMap.put(instanceName, result);
     }
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    TestHelper.StartCMResult startResult = TestHelper.startController(CLUSTER_NAME,
-                                                                      controllerName,
-                                                                      ZK_ADDR,
-                                                                      HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
-
+    // TestHelper.StartCMResult startResult =
+    // TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
+    // HelixControllerMain.STANDALONE);
+    // _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
     // create cluster manager
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
     _manager.connect();
     _driver = new TaskDriver(_manager);
 
-    boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR,
-                                                                                                                CLUSTER_NAME));
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+            ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
-    result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                           CLUSTER_NAME));
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
     Assert.assertTrue(result);
   }
 
   @AfterClass
-  public void afterClass()
-      throws Exception
-  {
+  public void afterClass() throws Exception {
     /**
      * shutdown order: 1) disconnect the controller 2) disconnect participants
      */
-
-    TestHelper.StartCMResult result;
-    Iterator<Map.Entry<String, TestHelper.StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      if (instanceName.startsWith(CONTROLLER_PREFIX))
-      {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
-      }
-    }
-
-    Thread.sleep(100);
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
     }
-
     _manager.disconnect();
   }
 
   @Test
-  public void basic()
-      throws Exception
-  {
+  public void basic() throws Exception {
     basic(100);
   }
 
   @Test
-  public void zeroTaskCompletionTime()
-      throws Exception
-  {
+  public void zeroTaskCompletionTime() throws Exception {
     basic(0);
   }
 
   @Test
-  public void testExpiry() throws Exception
-  {
+  public void testExpiry() throws Exception {
     String taskName = "Expiry";
     long expiry = 1000;
-    Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+    Workflow flow =
+        WorkflowGenerator
+            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
+                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
 
     _driver.start(flow);
     TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
@@ -164,10 +155,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
-    String workflowPropStoreKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+    String workflowPropStoreKey =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
 
     // Ensure context and config exist
-    Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
+    Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for task to finish and expire
@@ -177,18 +170,21 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     Thread.sleep(expiry);
 
     // Ensure workflow config and context were cleaned up by now
-    Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
+    Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
     Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
   }
 
-  private void basic(long taskCompletionTime)
-      throws Exception
-  {
-    // We use a different resource name in each test method as a work around for a helix participant bug where it does
-    // not clear locally cached state when a resource partition is dropped. Once that is fixed we should change these
-    // tests to use the same resource name and implement a beforeMethod that deletes the task resource.
+  private void basic(long taskCompletionTime) throws Exception {
+    // We use a different resource name in each test method as a work around for a helix participant
+    // bug where it does
+    // not clear locally cached state when a resource partition is dropped. Once that is fixed we
+    // should change these
+    // tests to use the same resource name and implement a beforeMethod that deletes the task
+    // resource.
     final String taskResource = "basic" + taskCompletionTime;
-    Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
             TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
     _driver.start(flow);
 
@@ -196,26 +192,25 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
-    TaskContext ctx = TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
-    for (int i = 0; i < NUM_PARTITIONS; i++)
-    {
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
   }
 
   @Test
-  public void partitionSet()
-      throws Exception
-  {
+  public void partitionSet() throws Exception {
     final String taskResource = "partitionSet";
     ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
 
     // construct and submit our basic workflow
-    Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100),
-            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(1),
-            TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
+            .build();
     _driver.start(flow);
 
     // wait for task completeness/timeout
@@ -229,36 +224,33 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     Assert.assertNotNull(ctx);
     Assert.assertNotNull(workflowContext);
     Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
-    for (int i : targetPartitions)
-    {
+    for (int i : targetPartitions) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
   }
 
   @Test
-  public void testRepeatedWorkflow() throws Exception
-  {
+  public void testRepeatedWorkflow() throws Exception {
     String workflowName = "SomeWorkflow";
-    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+    Workflow flow =
+        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
     new TaskDriver(_manager).start(flow);
 
     // Wait until the task completes
     TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
-    for(String task : flow.getTaskConfigs().keySet())
-    {
+    for (String task : flow.getTaskConfigs().keySet()) {
       TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
   @Test
-  public void timeouts()
-      throws Exception
-  {
+  public void timeouts() throws Exception {
     final String taskResource = "timeouts";
-    Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
             TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
             TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
     _driver.start(flow);
@@ -267,13 +259,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
-    TaskContext ctx = TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
     int maxAttempts = 0;
-    for (int i = 0; i < NUM_PARTITIONS; i++)
-    {
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
-      if (state != null)
-      {
+      if (state != null) {
         Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);
         maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i));
       }
@@ -281,48 +272,40 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
     Assert.assertEquals(maxAttempts, 2);
   }
 
-  private static class ReindexTask implements Task
-  {
+  private static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg)
-    {
+    public ReindexTask(String cfg) {
       _delay = Long.parseLong(cfg);
     }
 
     @Override
-    public TaskResult run()
-    {
+    public TaskResult run() {
       long expiry = System.currentTimeMillis() + _delay;
       long timeLeft;
-      while (System.currentTimeMillis() < expiry)
-      {
-        if (_canceled)
-        {
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
           timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+              : timeLeft));
         }
         sleep(50);
       }
       timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
     }
 
     @Override
-    public void cancel()
-    {
+    public void cancel() {
       _canceled = true;
     }
 
-    private static void sleep(long d)
-    {
-      try
-      {
+    private static void sleep(long d) {
+      try {
         Thread.sleep(d);
-      }
-      catch (InterruptedException e)
-      {
+      } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 4c17397..05ed55e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -3,18 +3,23 @@
  */
 package org.apache.helix.integration.task;
 
-
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.task.*;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -23,13 +28,11 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 /**
  * @author Abe <as...@linkedin.com>
  * @version $Revision$
  */
-public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
-{
+public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
   private static final Logger LOG = Logger.getLogger(ZkStandAloneCMTestBase.class);
   private static final int NUM_NODES = 5;
   private static final int START_PORT = 12918;
@@ -39,24 +42,23 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String, TestHelper.StartCMResult>();
+  // private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String,
+  // TestHelper.StartCMResult>();
+  private MockParticipantManager _participants[] = new MockParticipantManager[NUM_NODES];
+  private ClusterControllerManager _controller;
   private HelixManager _manager;
   private TaskDriver _driver;
 
   @BeforeClass
-  public void beforeClass()
-      throws Exception
-  {
+  public void beforeClass() throws Exception {
     String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace))
-    {
+    if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);
     }
 
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < NUM_NODES; i++)
-    {
+    for (int i = 0; i < NUM_NODES; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
@@ -66,87 +68,69 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory()
-    {
+    taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config)
-      {
+      public Task createNewTask(String config) {
         return new ReindexTask(config);
       }
     });
 
     // start dummy participants
-    for (int i = 0; i < NUM_NODES; i++)
-    {
+    for (int i = 0; i < NUM_NODES; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      TestHelper.StartCMResult result = TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
-      _startCMResultMap.put(instanceName, result);
+      // TestHelper.StartCMResult result =
+      _participants[i] =
+          TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
+      // _startCMResultMap.put(instanceName, result);
     }
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    TestHelper.StartCMResult startResult = TestHelper.startController(CLUSTER_NAME,
-                                                                      controllerName,
-                                                                      ZK_ADDR,
-                                                                      HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    // TestHelper.StartCMResult startResult =
+    // TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
+    // HelixControllerMain.STANDALONE);
+    // _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
     // create cluster manager
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
     _manager.connect();
 
     _driver = new TaskDriver(_manager);
 
-    boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR,
-                                                                                                                CLUSTER_NAME));
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+            ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
-    result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                           CLUSTER_NAME));
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
     Assert.assertTrue(result);
   }
 
   @AfterClass
-  public void afterClass()
-      throws Exception
-  {
+  public void afterClass() throws Exception {
     /**
      * shutdown order: 1) disconnect the controller 2) disconnect participants
      */
 
-    TestHelper.StartCMResult result;
-    Iterator<Map.Entry<String, TestHelper.StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      if (instanceName.startsWith(CONTROLLER_PREFIX))
-      {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
-      }
-    }
-
-    Thread.sleep(100);
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
     }
 
     _manager.disconnect();
   }
 
   @Test
-  public void stopAndResume()
-      throws Exception
-  {
-    Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
+  public void stopAndResume() throws Exception {
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
             TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
 
     LOG.info("Starting flow " + flow.getName());
@@ -163,9 +147,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
   }
 
   @Test
-  public void stopAndResumeWorkflow()
-          throws Exception
-  {
+  public void stopAndResumeWorkflow() throws Exception {
     String workflow = "SomeWorkflow";
     Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
 
@@ -182,48 +164,40 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
     TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
   }
 
-  public static class ReindexTask implements Task
-  {
+  public static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg)
-    {
+    public ReindexTask(String cfg) {
       _delay = Long.parseLong(cfg);
     }
 
     @Override
-    public TaskResult run()
-    {
+    public TaskResult run() {
       long expiry = System.currentTimeMillis() + _delay;
       long timeLeft;
-      while (System.currentTimeMillis() < expiry)
-      {
-        if (_canceled)
-        {
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
           timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+              : timeLeft));
         }
         sleep(50);
       }
       timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
     }
 
     @Override
-    public void cancel()
-    {
+    public void cancel() {
       _canceled = true;
     }
 
-    private static void sleep(long d)
-    {
-      try
-      {
+    private static void sleep(long d) {
+      try {
         Thread.sleep(d);
-      }
-      catch (InterruptedException e)
-      {
+      } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }