You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/11/30 18:19:06 UTC

[3/3] helix git commit: [HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.

[HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bbb20be
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bbb20be
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bbb20be

Branch: refs/heads/helix-0.6.x
Commit: 7bbb20be67a939a57f33d8f6d7c814b1dc246575
Parents: 7569a0a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:54:34 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:54:34 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobQueue.java    | 66 ++++++++++----------
 .../java/org/apache/helix/task/TaskDriver.java  | 42 ++++++++-----
 .../java/org/apache/helix/task/TaskRunner.java  |  5 +-
 .../org/apache/helix/task/TaskStateModel.java   |  8 +--
 .../java/org/apache/helix/task/Workflow.java    | 55 +++++++++++-----
 .../org/apache/helix/task/WorkflowConfig.java   | 14 +++++
 .../task/TestIndependentTaskRebalancer.java     |  1 -
 .../integration/task/TestRecurringJobQueue.java | 52 ++++++++-------
 .../task/TestTaskRebalancerParallel.java        |  4 +-
 9 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index bca5911..0280c88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -19,31 +19,26 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
  * A named queue to which jobs can be added
  */
-public class JobQueue extends WorkflowConfig {
+public class JobQueue extends Workflow {
   /* Config fields */
   public static final String CAPACITY = "CAPACITY";
 
-  private final String _name;
   private final int _capacity;
 
-  private JobQueue(String name, int capacity, WorkflowConfig config) {
-    super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(),
-        config.isTerminable(), config.getScheduleConfig());
-    _name = name;
+  private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+      Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
+    super(name, workflowConfig, jobConfigs, taskConfigs);
     _capacity = capacity;
-  }
-
-  /**
-   * Get the name of this queue
-   * @return queue name
-   */
-  public String getName() {
-    return _name;
+    validate();
   }
 
   /**
@@ -54,31 +49,24 @@ public class JobQueue extends WorkflowConfig {
     return _capacity;
   }
 
-  @Override
   public Map<String, String> getResourceConfigMap() throws Exception {
-    Map<String, String> cfgMap = super.getResourceConfigMap();
+    Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
     cfgMap.put(CAPACITY, String.valueOf(_capacity));
     return cfgMap;
   }
 
-  /** Supports creation of a single empty queue */
-  public static class Builder {
-    private WorkflowConfig.Builder _builder;
-    private final String _name;
+  /** Supports creation of a single queue */
+  public static class Builder extends Workflow.Builder {
     private int _capacity = Integer.MAX_VALUE;
+    private List<String> jobs;
 
     public Builder(String name) {
-      _builder = new WorkflowConfig.Builder();
-      _name = name;
-    }
-
-    public Builder parallelJobs(int parallelJobs) {
-      _builder.setParallelJobs(parallelJobs);
-      return this;
+      super(name);
+      jobs = new ArrayList<String>();
     }
 
     public Builder expiry(long expiry) {
-      _builder.setExpiry(expiry);
+      _expiry = expiry;
       return this;
     }
 
@@ -87,18 +75,32 @@ public class JobQueue extends WorkflowConfig {
       return this;
     }
 
+    @Override
     public Builder fromMap(Map<String, String> cfg) {
-      _builder = WorkflowConfig.Builder.fromMap(cfg);
+      super.fromMap(cfg);
       if (cfg.containsKey(CAPACITY)) {
         _capacity = Integer.parseInt(cfg.get(CAPACITY));
       }
       return this;
     }
 
+    public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+      if (jobs.size() >= _capacity) {
+        throw new HelixException("Failed to push new job to jobQueue, it is already full");
+      }
+      addJobConfig(job, jobBuilder);
+      if (jobs.size() > 0) {
+        String previousJob = jobs.get(jobs.size() - 1);
+        addParentChildDependency(previousJob, job);
+      }
+      jobs.add(job);
+    }
+
     public JobQueue build() {
-      _builder.setTerminable(false);
-      WorkflowConfig workflowConfig = _builder.build();
-      return new JobQueue(_name, _capacity, workflowConfig);
+      WorkflowConfig.Builder builder = buildWorkflowConfig();
+      builder.setTerminable(false);
+      WorkflowConfig workflowConfig = builder.build();
+      return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cc1eac1..654ba4e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -191,8 +191,8 @@ public class TaskDriver {
     String flowName = flow.getName();
 
     // first, add workflow config to ZK
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow
-        .getWorkflowConfig().getResourceConfigMap());
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+        flow.getWorkflowConfig().getResourceConfigMap());
 
     // then schedule jobs
     for (String job : flow.getJobConfigs().keySet()) {
@@ -206,14 +206,7 @@ public class TaskDriver {
 
   /** Creates a new named job queue (workflow) */
   public void createQueue(JobQueue queue) throws Exception {
-    String queueName = queue.getName();
-    HelixProperty property = new HelixProperty(queueName);
-    property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
-    boolean created =
-        _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property);
-    if (!created) {
-      throw new IllegalArgumentException("Queue " + queueName + " already exists!");
-    }
+    start(queue);
   }
 
   /** Flushes a named job queue */
@@ -566,20 +559,35 @@ public class TaskDriver {
     setWorkflowTargetState(workflow, TargetState.DELETE);
   }
 
-  /** Helper function to change target state for a given workflow */
+  /**
+   * Helper function to change target state for a given workflow
+   */
   private void setWorkflowTargetState(String workflowName, TargetState state) {
     setSingleWorkflowTargetState(workflowName, state);
 
+    // TODO: this is the temporary fix for current task rebalance implementation.
+    // We should fix this in new task framework implementation.
+    List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
+    for (String resource : resources) {
+      if (resource.startsWith(workflowName)) {
+        setSingleWorkflowTargetState(resource, state);
+      }
+    }
+
+    /* TODO: use this code for new task framework.
     // For recurring schedules, last scheduled incomplete workflow must also be handled
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
     String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
-    WorkflowContext lastScheduledWorkflowCtx =
-        TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
-    if (lastScheduledWorkflowCtx != null &&
-        !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
-          || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
-      setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+    if (lastScheduledWorkflow != null) {
+      WorkflowContext lastScheduledWorkflowCtx =
+          TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+      if (lastScheduledWorkflowCtx != null && !(
+          lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+              || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+        setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+      }
     }
+    */
   }
 
   /** Helper function to change target state for a given workflow */

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index abd1882..7b17043 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -48,6 +48,7 @@ public class TaskRunner implements Runnable {
   // If true, indicates that the task has finished.
   private volatile boolean _done = false;
 
+
   public TaskRunner(Task task, String taskName, String taskPartition, String instance,
       HelixManager manager, String sessionId) {
     _task = task;
@@ -111,7 +112,9 @@ public class TaskRunner implements Runnable {
    * Signals the task to cancel itself.
    */
   public void cancel() {
-    _task.cancel();
+    if (!_done) {
+      _task.cancel();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 30939fc..525a38b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -205,22 +205,22 @@ public class TaskStateModel extends StateModel {
 
   @Transition(to = "INIT", from = "COMPLETED")
   public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "STOPPED")
   public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "TIMED_OUT")
   public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "TASK_ERROR")
   public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f69605e..259b72c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -49,19 +49,19 @@ public class Workflow {
   public static final String UNSPECIFIED = "UNSPECIFIED";
 
   /** Workflow name */
-  private String _name;
+  protected String _name;
 
   /** Holds workflow-level configurations */
-  private WorkflowConfig _workflowConfig;
+  protected WorkflowConfig _workflowConfig;
 
   /** Contains the per-job configurations for all jobs specified in the provided dag */
-  private Map<String, Map<String, String>> _jobConfigs;
+  protected Map<String, Map<String, String>> _jobConfigs;
 
   /** Containers the per-job configurations of all individually-specified tasks */
-  private Map<String, List<TaskConfig>> _taskConfigs;
+  protected Map<String, List<TaskConfig>> _taskConfigs;
 
   /** Constructs and validates a workflow against a provided dag and config set */
-  private Workflow(String name, WorkflowConfig workflowConfig,
+  protected Workflow(String name, WorkflowConfig workflowConfig,
       Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
     _name = name;
     _workflowConfig = workflowConfig;
@@ -225,12 +225,14 @@ public class Workflow {
 
   /** Build a workflow incrementally from dependencies and single configs, validate at build time */
   public static class Builder {
-    private String _name;
-    private JobDag _dag;
-    private Map<String, Map<String, String>> _jobConfigs;
-    private Map<String, List<TaskConfig>> _taskConfigs;
-    private ScheduleConfig _scheduleConfig;
-    private long _expiry;
+    protected String _name;
+    protected JobDag _dag;
+    protected Map<String, Map<String, String>> _jobConfigs;
+    protected Map<String, List<TaskConfig>> _taskConfigs;
+    protected ScheduleConfig _scheduleConfig;
+    protected long _expiry;
+    protected Map<String, String> _cfgMap;
+    protected int _parallelJobs = -1;
 
     public Builder(String name) {
       _name = name;
@@ -287,6 +289,11 @@ public class Workflow {
       return this;
     }
 
+    public Builder fromMap(Map<String, String> cfg) {
+      _cfgMap = cfg;
+      return this;
+    }
+
     public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
       _scheduleConfig = scheduleConfig;
       return this;
@@ -301,13 +308,30 @@ public class Workflow {
       return TaskUtil.getNamespacedJobName(_name, job);
     }
 
+    public Builder parallelJobs(int parallelJobs) {
+      _parallelJobs = parallelJobs;
+      return this;
+    }
+
     public Workflow build() {
+      WorkflowConfig.Builder builder = buildWorkflowConfig();
+      // calls validate internally
+      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+    }
+
+    protected WorkflowConfig.Builder buildWorkflowConfig() {
       for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
         _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
       }
 
-      WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+      WorkflowConfig.Builder builder;
+      if (_cfgMap != null) {
+        builder = WorkflowConfig.Builder.fromMap(_cfgMap);
+      } else {
+        builder = new WorkflowConfig.Builder();
+      }
+
       builder.setJobDag(_dag);
       builder.setTargetState(TargetState.START);
       if (_scheduleConfig != null) {
@@ -316,8 +340,11 @@ public class Workflow {
       if (_expiry > 0) {
         builder.setExpiry(_expiry);
       }
-      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
-                                                                              // internally
+      if (_parallelJobs != -1) {
+        builder.setParallelJobs(_parallelJobs);
+      }
+
+      return builder;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index f15f235..56fba58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -100,6 +100,20 @@ public class WorkflowConfig {
     return defaultDateFormat;
   }
 
+  /**
+   * Get the scheduled start time of the workflow.
+   *
+   * @return start time if the workflow schedule is set, null if no schedule config set.
+   */
+  public Date getStartTime() {
+    // Workflow with non-scheduled config is ready to schedule immediately.
+    if (_scheduleConfig == null) {
+      return null;
+    }
+
+    return _scheduleConfig.getStartTime();
+  }
+
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1f17e92..a00a736 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -274,7 +274,6 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
     TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index eef1ce6..38c9113 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -179,7 +179,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     return cal.getTime();
   }
 
-  private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
     cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
@@ -191,11 +191,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
     //cfgMap.put(WorkflowConfig.START_TIME,
-        //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
-    return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
+    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
-  private JobQueue buildRecurrentJobQueue(String jobQueueName) {
+
+  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
 
@@ -205,9 +206,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
-
+    JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i <= 1; i++) {
@@ -218,10 +217,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuild.enqueueJob(jobName, job);
       currentJobNames.add(jobName);
     }
 
+    _driver.start(queueBuild.build());
+
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure job 1 is started before stop it
@@ -234,8 +235,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.delete(queueName);
     Thread.sleep(500);
 
-    queue = buildRecurrentJobQueue(queueName, 5);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -245,10 +245,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuilder.enqueueJob(jobName, job);
       currentJobNames.add(jobName);
     }
 
+    _driver.createQueue(queueBuilder.build());
+
+
     wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure jobs are started and completed
@@ -269,12 +272,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Thread.sleep(100);
     for (int i = 0; i <= 4; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
@@ -285,9 +288,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
       LOG.info("Enqueuing job: " + jobName);
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuilder.enqueueJob(jobName, job);
       currentJobNames.add(i, jobName);
     }
+    _driver.createQueue(queueBuilder.build());
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
@@ -360,8 +364,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -369,7 +372,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
 
     final int JOB_COUNTS = 3;
-
     for (int i = 0; i < JOB_COUNTS; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
@@ -384,8 +386,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // enqueue all jobs except last one
     for (int i = 0; i < JOB_COUNTS - 1; ++i) {
       LOG.info("Enqueuing job: " + jobNames.get(i));
-      _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i));
+      queueBuilder.enqueueJob(jobNames.get(i), jobs.get(i));
     }
+
+    _driver.createQueue(queueBuilder.build());
+
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
@@ -398,6 +403,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
     _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1));
+    _driver.stop(queueName);
 
     // remove the last job
     _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1));
@@ -413,8 +419,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
     // create jobs
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
@@ -431,8 +436,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
             .setTargetPartitionStates(Sets.newHashSet("MASTER"));
 
     // enqueue both jobs
-    _driver.enqueueJob(queueName, "job1", job1);
-    _driver.enqueueJob(queueName, "job2", job2);
+    queueBuilder.enqueueJob("job1", job1);
+    queueBuilder.enqueueJob("job2", job2);
+
+    _driver.createQueue(queueBuilder.build());
+
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index f6fc53a..2ff8c56 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -156,7 +156,9 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
 
     String queueName = TestHelper.getTestMethodName();
 
-    JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build();
+    JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
+    queueBuild.parallelJobs(PARALLEL_COUNT);
+    JobQueue queue = queueBuild.build();
     _driver.createQueue(queue);
 
     List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();