You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:03 UTC

[07/33] helix git commit: Refactor TaskUtil class to move as many as methods out of the class, and make other methods in it as internal API as possible. Expose necessary APIs in TaskDriver instead.

Refactor TaskUtil class to move as many as methods out of the class, and make other methods in it as internal API as possible. Expose necessary APIs in TaskDriver instead.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/579d82fd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/579d82fd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/579d82fd

Branch: refs/heads/helix-0.6.x
Commit: 579d82fd2aa8fdce8ec0e0c4d6da73cb8209729d
Parents: 6d42db4
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 23 17:32:35 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:44:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/JobRebalancer.java    |   6 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  36 ++++
 .../java/org/apache/helix/task/TaskRunner.java  |  37 +++-
 .../java/org/apache/helix/task/TaskUtil.java    | 178 +++----------------
 .../apache/helix/task/WorkflowRebalancer.java   |  89 +++++++++-
 .../helix/integration/task/TaskTestUtil.java    |  35 ++--
 .../task/TestDisableJobExternalView.java        |   2 +-
 .../task/TestIndependentTaskRebalancer.java     |  25 ++-
 .../integration/task/TestRecurringJobQueue.java |  36 ++--
 .../task/TestRunJobsWithMissingTarget.java      |  11 +-
 .../integration/task/TestTaskRebalancer.java    |  32 ++--
 .../task/TestTaskRebalancerFailover.java        |  12 +-
 .../task/TestTaskRebalancerParallel.java        |   2 +-
 .../task/TestTaskRebalancerRetryLimit.java      |   4 +-
 .../task/TestTaskRebalancerStopResume.java      |  78 ++++----
 .../integration/task/TestUpdateWorkflow.java    |  19 +-
 16 files changed, 306 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 93d4689..7eeafc7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -90,7 +90,7 @@ public class JobRebalancer extends TaskRebalancer {
     // The job is already in a final state (completed/failed).
     if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
       LOG.info("Job " + jobName + " is failed or already completed, clean up IS.");
-      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
@@ -340,7 +340,7 @@ public class JobRebalancer extends TaskRebalancer {
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
               // remove IdealState of this job
-              TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+              cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
               return buildEmptyAssignment(jobResource, currStateOutput);
             } else {
               skippedPartitions.add(pId);
@@ -376,7 +376,7 @@ public class JobRebalancer extends TaskRebalancer {
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
       // remove IdealState of this job
-      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }
 
     // Make additional task assignments if needed.

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index f35ce69..b006efc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -29,8 +29,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -171,6 +173,40 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
+  /**
+   * Cleans up IdealState and external view associated with a job/workflow resource.
+   */
+  protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
+    LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
+
+    // Delete the ideal state itself.
+    PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
+    if (accessor.getProperty(isKey) != null) {
+      if (!accessor.removeProperty(isKey)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+            resourceName, isKey));
+      }
+    } else {
+      LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
+    }
+
+    // Delete dead external view
+    // because job is already completed, there is no more current state change
+    // thus dead external views removal will not be triggered
+    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
+    if (accessor.getProperty(evKey) != null) {
+      if (!accessor.removeProperty(evKey)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+            resourceName, evKey));
+      }
+    }
+
+    LOG.info(String
+        .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
+  }
+
   @Override public IdealState computeNewIdealState(String resourceName,
       IdealState currentIdealState, CurrentStateOutput currentStateOutput,
       ClusterDataCache clusterData) {

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 7b17043..1bf88ec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -19,7 +19,10 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.task.TaskResult.Status;
 import org.apache.log4j.Logger;
 
@@ -168,8 +171,8 @@ public class TaskRunner implements Runnable {
    */
   private void requestStateTransition(TaskPartitionState state) {
     boolean success =
-        TaskUtil.setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId,
-            _taskName, _taskPartition, state);
+        setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId, _taskName,
+            _taskPartition, state);
     if (!success) {
       LOG.error(String
           .format(
@@ -177,4 +180,34 @@ public class TaskRunner implements Runnable {
               state, _instance, _sessionId, _taskPartition));
     }
   }
+
+  /**
+   * Request a state change for a specific task.
+   *
+   * @param accessor  connected Helix data accessor
+   * @param instance  the instance serving the task
+   * @param sessionId the current session of the instance
+   * @param resource  the job name
+   * @param partition the task partition name
+   * @param state     the requested state
+   * @return true if the request was persisted, false otherwise
+   */
+  private static boolean setRequestedState(HelixDataAccessor accessor, String instance,
+      String sessionId, String resource, String partition, TaskPartitionState state) {
+    LOG.debug(
+        String.format("Requesting a state transition to %s for partition %s.", state, partition));
+    try {
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+      CurrentState currStateDelta = new CurrentState(resource);
+      currStateDelta.setRequestedState(partition, state.name());
+
+      return accessor.updateProperty(key, currStateDelta);
+    } catch (Exception e) {
+      LOG.error(String
+          .format("Error when requesting a state transition to %s for partition %s.", state,
+              partition), e);
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ca274d0..49622f3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -55,15 +55,17 @@ import com.google.common.collect.Maps;
 public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
   public static final String CONTEXT_NODE = "Context";
+
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
+   * This method is internal API, please use TaskDriver.getJobConfig();
    *
    * @param accessor    Accessor to access Helix configs
    * @param jobResource The name of the job resource
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
+  protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
     HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
     if (jobResourceConfig == null) {
       return null;
@@ -83,25 +85,27 @@ public class TaskUtil {
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
+   * This method is internal API, please use TaskDriver.getJobConfig();
    *
    * @param manager     HelixManager object used to connect to Helix.
    * @param jobResource The name of the job resource.
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
+  protected static JobConfig getJobCfg(HelixManager manager, String jobResource) {
     return getJobCfg(manager.getHelixDataAccessor(), jobResource);
   }
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+   * This method is internal API, please use TaskDriver.getWorkflowConfig();
    *
    * @param accessor  Accessor to access Helix configs
    * @param workflow The name of the workflow.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  public static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+  protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
     HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
     if (workflowCfg == null) {
       return null;
@@ -115,60 +119,32 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+   * This method is internal API, please use TaskDriver.getWorkflowConfig();
    *
    * @param manager          Helix manager object used to connect to Helix.
    * @param workflow The name of the workflow resource.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
+  protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
     return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
   }
 
   /**
-   * Request a state change for a specific task.
-   *
-   * @param accessor  connected Helix data accessor
-   * @param instance  the instance serving the task
-   * @param sessionId the current session of the instance
-   * @param resource  the job name
-   * @param partition the task partition name
-   * @param state     the requested state
-   * @return true if the request was persisted, false otherwise
-   */
-  public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
-      String sessionId, String resource, String partition, TaskPartitionState state) {
-    LOG.debug(
-        String.format("Requesting a state transition to %s for partition %s.", state, partition));
-    try {
-      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
-      CurrentState currStateDelta = new CurrentState(resource);
-      currStateDelta.setRequestedState(partition, state.name());
-
-      return accessor.updateProperty(key, currStateDelta);
-    } catch (Exception e) {
-      LOG.error(String
-          .format("Error when requesting a state transition to %s for partition %s.", state,
-              partition), e);
-      return false;
-    }
-  }
-
-  /**
    * Get a Helix configuration scope at a resource (i.e. job and workflow) level
    *
    * @param clusterName the cluster containing the resource
    * @param resource    the resource name
    * @return instantiated {@link HelixConfigScope}
    */
-  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
+  protected static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
     return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
         .forCluster(clusterName).forResource(resource).build();
   }
 
   /**
-   * Get the runtime context of a single job
+   * Get the runtime context of a single job.
+   * This method is internal API, please use TaskDriver.getJobContext();
    *
    * @param propertyStore Property store for the cluster
    * @param jobResource   The name of the job
@@ -183,31 +159,34 @@ public class TaskUtil {
   }
 
   /**
-   * Get the runtime context of a single job
+   * Get the runtime context of a single job.
+   * This method is internal API, please use TaskDriver.getJobContext();
    *
    * @param manager     a connection to Helix
    * @param jobResource the name of the job
    * @return the {@link JobContext}, or null if none is available
    */
-  public static JobContext getJobContext(HelixManager manager, String jobResource) {
+  protected static JobContext getJobContext(HelixManager manager, String jobResource) {
     return getJobContext(manager.getHelixPropertyStore(), jobResource);
   }
 
   /**
    * Set the runtime context of a single job
+   * This method is internal API;
    *
    * @param manager     a connection to Helix
    * @param jobResource the name of the job
    * @param ctx         the up-to-date {@link JobContext} for the job
    */
-  public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
+  protected static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
     manager.getHelixPropertyStore()
         .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
             ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
-   * Get the runtime context of a single workflow
+   * Get the runtime context of a single workflow.
+   * This method is internal API, please use TaskDriver.getWorkflowContext();
    *
    * @param propertyStore    Property store of the cluster
    * @param workflowResource The name of the workflow
@@ -222,13 +201,14 @@ public class TaskUtil {
   }
 
   /**
-   * Get the runtime context of a single workflow
+   * Get the runtime context of a single workflow.
+   * This method is internal API, please use TaskDriver.getWorkflowContext();
    *
    * @param manager          a connection to Helix
    * @param workflowResource the name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
-  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+  protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
     return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
   }
 
@@ -367,126 +347,12 @@ public class TaskUtil {
     return null;
   }
 
-  /**
-   * Create a new workflow based on an existing one
-   *
-   * @param manager          connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName  the name of the new workflow
-   * @param newStartTime     a provided start time that deviates from the desired start time
-   * @return the cloned workflow, or null if there was a problem cloning the existing one
-   */
-  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
-
-    // Create a new workflow with a new name
-    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
-    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
-
-    // Set the workflow expiry
-    workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
-
-    // Set the schedule, if applicable
-    ScheduleConfig scheduleConfig;
-    if (newStartTime != null) {
-      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-    } else {
-      scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
-    }
-    if (scheduleConfig != null) {
-      workflowBuilder.setScheduleConfig(scheduleConfig);
-    }
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-
-        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
-
-        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
-        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        jobCfgBuilder.addTaskConfigs(taskConfigs);
-        workflowBuilder.addJobConfig(job, jobCfgBuilder);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
-            workflowBuilder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return workflowBuilder.build();
-  }
-
   private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));
   }
 
   /**
-   * Cleans up IdealState and external view associated with a job/workflow resource.
-   */
-  public static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
-    LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
-    if (accessor.getProperty(isKey) != null) {
-      if (!accessor.removeProperty(isKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, isKey));
-      }
-    } else {
-      LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    if (accessor.getProperty(evKey) != null) {
-      if (!accessor.removeProperty(evKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, evKey));
-      }
-    }
-
-    LOG.info(String
-        .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
-  }
-
-  /**
    * Extracts the partition id from the given partition name.
    *
    * @param pName

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index db5426c..05b6dc6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import com.google.common.collect.Lists;
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.*;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -264,8 +265,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
         String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
         LOG.debug("Ready to start workflow " + newWorkflowName);
         if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf = TaskUtil
-              .cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
+          Workflow clonedWf =
+              cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
           TaskDriver driver = new TaskDriver(_manager);
           try {
             // Start the cloned workflow
@@ -298,6 +299,86 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
+   * Create a new workflow based on an existing one
+   *
+   * @param manager          connection to Helix
+   * @param origWorkflowName the name of the existing workflow
+   * @param newWorkflowName  the name of the new workflow
+   * @param newStartTime     a provided start time that deviates from the desired start time
+   * @return the cloned workflow, or null if there was a problem cloning the existing one
+   */
+  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
+      String newWorkflowName, Date newStartTime) {
+    // Read all resources, including the workflow and jobs of interest
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    if (!resourceConfigMap.containsKey(origWorkflowName)) {
+      LOG.error("No such workflow named " + origWorkflowName);
+      return null;
+    }
+    if (resourceConfigMap.containsKey(newWorkflowName)) {
+      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      return null;
+    }
+
+    // Create a new workflow with a new name
+    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
+    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
+    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
+
+    // Set the workflow expiry
+    workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+
+    // Set the schedule, if applicable
+    ScheduleConfig scheduleConfig;
+    if (newStartTime != null) {
+      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+    } else {
+      scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
+    }
+    if (scheduleConfig != null) {
+      workflowBuilder.setScheduleConfig(scheduleConfig);
+    }
+
+    // Add each job back as long as the original exists
+    Set<String> namespacedJobs = jobDag.getAllNodes();
+    for (String namespacedJob : namespacedJobs) {
+      if (resourceConfigMap.containsKey(namespacedJob)) {
+        // Copy over job-level and task-level configs
+        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
+        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
+
+        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
+        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
+        List<TaskConfig> taskConfigs = Lists.newLinkedList();
+        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+          taskConfigs.add(taskConfig);
+        }
+        jobCfgBuilder.addTaskConfigs(taskConfigs);
+        workflowBuilder.addJobConfig(job, jobCfgBuilder);
+
+        // Add dag dependencies
+        Set<String> children = parentsToChildren.get(namespacedJob);
+        if (children != null) {
+          for (String namespacedChild : children) {
+            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
+            workflowBuilder.addParentChildDependency(job, child);
+          }
+        }
+      }
+    }
+    return workflowBuilder.build();
+  }
+
+  /**
    * Cleans up workflow configs and workflow contexts associated with this workflow,
    * including all job-level configs and context, plus workflow-level information.
    */
@@ -319,7 +400,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // clean up workflow-level info if this was the last in workflow
     if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
       // clean up IS & EV
-      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
+      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
 
       // delete workflow config
       PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
@@ -354,7 +435,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
     // Remove any idealstate and externalView.
-    TaskUtil.cleanupIdealStateExtView(accessor, job);
+    cleanupIdealStateExtView(accessor, job);
 
     // Remove DAG references in workflow
     PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 06b9751..9796497 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
@@ -53,7 +54,7 @@ public class TaskTestUtil {
    * @param workflowResource Resource to poll for completeness
    * @throws InterruptedException
    */
-  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+  public static void pollForWorkflowState(TaskDriver driver, String workflowResource,
       TaskState... targetStates) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
@@ -61,7 +62,7 @@ public class TaskTestUtil {
     Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
     do {
       Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      ctx = driver.getWorkflowContext(workflowResource);
     } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
         .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + _default_timeout);
 
@@ -73,23 +74,23 @@ public class TaskTestUtil {
 
   /**
    * poll for job until it is at either state in targetStates.
-   * @param manager
+   * @param driver
    * @param workflowResource
    * @param jobName
    * @param targetStates
    * @throws InterruptedException
    */
-  public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
+  public static void pollForJobState(TaskDriver driver, String workflowResource, String jobName,
       TaskState... targetStates) throws InterruptedException {
     // Get workflow config
-    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
+    WorkflowConfig wfCfg = driver.getWorkflowConfig(workflowResource);
     Assert.assertNotNull(wfCfg);
     WorkflowContext ctx;
     if (wfCfg.isRecurring()) {
       // if it's recurring, need to reconstruct workflow and job name
       do {
         Thread.sleep(100);
-        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+        ctx = driver.getWorkflowContext(workflowResource);
       } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
       Assert.assertNotNull(ctx);
       Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
@@ -103,7 +104,7 @@ public class TaskTestUtil {
     long st = System.currentTimeMillis();
     do {
       Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      ctx = driver.getWorkflowContext(workflowResource);
     }
     while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(
         ctx.getJobState(jobName)))
@@ -114,27 +115,27 @@ public class TaskTestUtil {
         "expect job states: " + allowedStates + " actual job state: " + jobState);
   }
 
-  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+  public static void pollForEmptyJobState(final TaskDriver driver, final String workflowName,
       final String jobName) throws Exception {
     final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
     boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
 
       @Override
       public boolean verify() throws Exception {
-        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+        WorkflowContext ctx = driver.getWorkflowContext(workflowName);
         return ctx == null || ctx.getJobState(namespacedJobName) == null;
       }
     }, _default_timeout);
     Assert.assertTrue(succeed);
   }
 
-  public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
+  public static WorkflowContext pollForWorkflowContext(TaskDriver driver, String workflowResource)
       throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
     do {
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      ctx = driver.getWorkflowContext(workflowResource);
       Thread.sleep(100);
     } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
@@ -143,15 +144,15 @@ public class TaskTestUtil {
 
   // 1. Different jobs in a same work flow is in RUNNING at the same time
   // 2. No two jobs in the same work flow is in RUNNING at the same instance
-  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
+  public static boolean pollForWorkflowParallelState(TaskDriver driver, String workflowName)
       throws InterruptedException {
 
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
+    WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName);
     Assert.assertNotNull(workflowConfig);
 
     WorkflowContext workflowContext = null;
     while (workflowContext == null) {
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      workflowContext = driver.getWorkflowContext(workflowName);
       Thread.sleep(100);
     }
 
@@ -162,7 +163,7 @@ public class TaskTestUtil {
       finished = true;
       int runningCount = 0;
 
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      workflowContext = driver.getWorkflowContext(workflowName);
       for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
         TaskState jobState = workflowContext.getJobState(jobName);
         if (jobState == TaskState.IN_PROGRESS) {
@@ -177,9 +178,9 @@ public class TaskTestUtil {
 
       List<JobContext> jobContextList = new ArrayList<JobContext>();
       for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
+        JobContext jobContext = driver.getJobContext(jobName);
         if (jobContext != null) {
-          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
+          jobContextList.add(driver.getJobContext(jobName));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
index b23e268..f673f7b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
@@ -175,7 +175,7 @@ public class TestDisableJobExternalView extends ZkIntegrationTestBase {
 
     // ensure all jobs are completed
     String namedSpaceJob3 = String.format("%s_%s", queueName, "job3");
-    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob3, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob3, TaskState.COMPLETED);
 
     Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
     String namedSpaceJob1 = String.format("%s_%s", queueName, "job1");

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index ba8367e..1c58776 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -45,7 +45,6 @@ import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskResult.Status;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
@@ -157,8 +156,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -184,8 +183,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -213,8 +212,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -241,8 +240,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -276,13 +275,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
 
     // Check that the workflow only started after the start time (with a 1 second buffer)
-    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, jobName);
+    WorkflowContext workflowCtx = _driver.getWorkflowContext(jobName);
     long startTime = workflowCtx.getStartTime();
     Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
   }
@@ -308,10 +307,10 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure completion
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
 
     // Ensure a single retry happened
-    JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
+    JobContext jobCtx = _driver.getJobContext(jobName + "_" + jobName);
     Assert.assertEquals(jobCtx.getPartitionNumAttempts(0), 2);
     Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index d83c5eb..ae3d52d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -179,13 +179,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     _driver.start(queueBuild.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
 
     // ensure job 1 is started before stop it
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
     TaskTestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
+        .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
     _driver.delete(queueName);
@@ -208,18 +208,18 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.createQueue(queueBuilder.build());
 
 
-    wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
 
     // ensure jobs are started and completed
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
     TaskTestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
+        .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
 
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1));
     TaskTestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
+        .pollForJobState(_driver, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
   }
 
   @Test
@@ -249,21 +249,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     }
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
     TaskTestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+        .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
             TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -274,21 +274,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
         TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_driver, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -304,9 +304,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
-      JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
+      JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
       Assert.assertTrue(jobStart >= preJobFinish);
       preJobFinish = jobContext.getFinishTime();
@@ -349,12 +349,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -389,7 +389,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+    TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 9fd7735..101604b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -153,7 +153,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i < num_dbs; i++) {
       JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(
+              _test_dbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
       String jobName = "job" + _test_dbs.get(i);
       queueBuilder.enqueueJob(jobName, jobConfig);
@@ -164,8 +165,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
-    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
   }
 
   @Test
@@ -190,7 +191,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
-    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 2d11f85..3a5b179 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -168,7 +168,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
             .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -182,7 +182,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for job to finish and expire
-    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
     Thread.sleep(expiry);
@@ -212,10 +212,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait for job completion
-    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
-    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
@@ -239,13 +239,13 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // wait for job completeness/timeout
-    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
     String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
 
-    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedName);
-    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, jobResource);
+    JobContext ctx = _driver.getJobContext(namespacedName);
+    WorkflowContext workflowContext = _driver.getWorkflowContext(jobResource);
     Assert.assertNotNull(ctx);
     Assert.assertNotNull(workflowContext);
     Assert.assertEquals(workflowContext.getJobState(namespacedName), TaskState.COMPLETED);
@@ -264,11 +264,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     new TaskDriver(_manager).start(flow);
 
     // Wait until the workflow completes
-    TaskTestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
     for (String task : flow.getJobConfigs().keySet()) {
-      TaskTestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_driver, workflowName, task, TaskState.COMPLETED);
     }
   }
 
@@ -284,10 +284,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job reports failure.
-    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
-    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     int maxAttempts = 0;
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
@@ -322,10 +322,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Ensure successful completion
     String namespacedJob1 = queueName + "_masterJob";
     String namespacedJob2 = queueName + "_slaveJob";
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
-    JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
-    JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    JobContext masterJobContext = _driver.getJobContext(namespacedJob1);
+    JobContext slaveJobContext = _driver.getJobContext(namespacedJob2);
 
     // Ensure correct ordering
     long job1Finish = masterJobContext.getFinishTime();
@@ -340,7 +340,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName);
     JobDag dag = workflowCfg.getJobDag();
     Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
     Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index a778dcd..8051b2f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -151,13 +151,13 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
 
     // check all tasks completed on MASTER
     String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     ExternalView ev =
         accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
-    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedJob1);
+    JobContext ctx = _driver.getJobContext(namespacedJob1);
     Set<String> failOverPartitions = Sets.newHashSet();
     for (int p = 0; p < _p; p++) {
       String instanceName = ctx.getAssignedParticipant(p);
@@ -178,12 +178,12 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job);
 
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.IN_PROGRESS);
     _participants[0].syncStop();
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // tasks previously assigned to localhost_12918 should be re-scheduled on new master
-    ctx = TaskUtil.getJobContext(_manager, namespacedJob2);
+    ctx = _driver.getJobContext(namespacedJob2);
     ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
     for (int p = 0; p < _p; p++) {
       String partitionName = ctx.getTargetForPartition(p);
@@ -204,7 +204,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName);
     JobDag dag = workflowCfg.getJobDag();
     Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
     Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index c9a0445..580f5ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -158,6 +158,6 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
       _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
     }
 
-    Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_manager, queueName));
+    Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_driver, queueName));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index 8fec899..e576304 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -138,9 +138,9 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
 
-    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < _p; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
       if (state != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 7a8d305..30cb460 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -168,15 +168,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing job");
     _driver.stop(JOB_RESOURCE);
-    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.STOPPED);
 
     LOG.info("Resuming job");
     _driver.resume(JOB_RESOURCE);
-    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
@@ -186,15 +186,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing workflow");
     _driver.stop(workflow);
-    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.STOPPED);
 
     LOG.info("Resuming workflow");
     _driver.resume(workflow);
-    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.COMPLETED);
   }
 
   @Test
@@ -224,27 +224,27 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.IN_PROGRESS);
 
     // stop job1
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
 
     // Ensure job2 is not started
     TimeUnit.MILLISECONDS.sleep(200);
     String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
-    TaskTestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+    TaskTestUtil.pollForEmptyJobState(_driver, queueName, job2Name);
 
     LOG.info("Resuming job-queue: " + queueName);
     _driver.resume(queueName);
 
     // Ensure successful completion
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
-    JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
-    JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    JobContext masterJobContext = _driver.getJobContext(namespacedJob1);
+    JobContext slaveJobContext = _driver.getJobContext(namespacedJob2);
 
     // Ensure correct ordering
     long job1Finish = masterJobContext.getFinishTime();
@@ -290,13 +290,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
-    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -307,19 +307,19 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     // ensure job 2 is started
     TaskTestUtil
-        .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+        .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
     TaskTestUtil
-        .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+        .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
-    TaskTestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_driver, queueName, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -343,9 +343,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJobName, TaskState.COMPLETED);
 
-      JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
+      JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
       Assert.assertTrue(jobStart >= preJobFinish);
       preJobFinish = jobContext.getFinishTime();
@@ -391,20 +391,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
     TaskTestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+        .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -415,20 +415,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_driver, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -444,9 +444,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
-      JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
+      JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
       Assert.assertTrue(jobStart >= preJobFinish);
       preJobFinish = jobContext.getFinishTime();
@@ -487,12 +487,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -535,10 +535,10 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
 
     String namespacedJob2 = String.format("%s_%s", queueName,  job2Name);
-    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // Stop queue
     _driver.stop(queueName);
@@ -588,11 +588,11 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+    TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName);
   }
 
   private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName);
     JobDag dag = workflowCfg.getJobDag();
     Assert.assertFalse(dag.getAllNodes().contains(namedSpacedJobName));
     Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));

http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index fc93392..964f9e1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -19,13 +19,10 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.ZkIntegrationTestBase;
@@ -35,7 +32,6 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.Task;
@@ -44,8 +40,6 @@ import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
@@ -58,7 +52,6 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -180,9 +173,9 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
 
     _driver.start(queueBuild.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
 
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_manager, queueName);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
     WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig);
 
     Calendar startTime = Calendar.getInstance();
@@ -195,18 +188,18 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
 
     // ensure current schedule is started
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS);
 
     _driver.updateWorkflow(queueName, configBuilder.build());
 
     // ensure current schedule is completed
-    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
 
     Thread.sleep(1000);
 
-    wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, scheduledQueue);
+    WorkflowConfig wCfg = _driver.getWorkflowConfig(scheduledQueue);
 
     Calendar configStartTime = Calendar.getInstance();
     configStartTime.setTime(wCfg.getStartTime());