You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/11/30 18:19:04 UTC

[1/3] helix git commit: [HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring job queue.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 456ddb0c4 -> 7bbb20be6


[HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring job queue.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d129d3ab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d129d3ab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d129d3ab

Branch: refs/heads/helix-0.6.x
Commit: d129d3ab780adb1ff41fe5a0bfb3dafd7d5068a3
Parents: 456ddb0
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:38:31 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:38:31 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  1 +
 .../org/apache/helix/task/TaskRebalancer.java   | 35 +++++++++++++++++---
 .../java/org/apache/helix/task/TaskUtil.java    |  1 +
 .../java/org/apache/helix/task/Workflow.java    | 17 +++++++---
 .../integration/task/TestRecurringJobQueue.java | 32 +++++++++++++++---
 5 files changed, 74 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 30f76b7..c7c2f38 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -217,6 +217,7 @@ public class JobConfig {
     cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
     cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
     cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
+    cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
     return cfgMap;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2ff8b8c..5a86c3d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -113,10 +114,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
     final String resourceName = resource.getResourceName();
+    LOG.debug("Computer Best Partition for resource: " + resourceName);
 
     // Fetch job configuration
     JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
     if (jobCfg == null) {
+      LOG.debug("Job configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
     String workflowResource = jobCfg.getWorkflow();
@@ -124,6 +127,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // Fetch workflow configuration and context
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
     if (workflowCfg == null) {
+      LOG.debug("Workflow configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
@@ -132,6 +136,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
       workflowCtx.setStartTime(System.currentTimeMillis());
+      LOG.info("Workflow context for " + resourceName + " created!");
     }
 
     // check ancestor job status
@@ -147,12 +152,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
 
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
+      LOG.info(
+          "Workflow is marked as deleted " + workflowResource
+              + " cleaning up the workflow context.");
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
     }
@@ -160,6 +169,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // Check if this workflow has been finished past its expiry.
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
         && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      LOG.info("Workflow " + workflowResource
+          + " is completed and passed expiry time, cleaning up the workflow context.");
       markForDeletion(_manager, workflowResource);
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
@@ -176,6 +187,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     long jobFinishTime = jobCtx.getFinishTime();
     if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
         && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      LOG.info("Job " + resourceName
+          + " is completed and passed expiry time, cleaning up the job context.");
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
     }
@@ -183,6 +196,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // The job is already in a final state (completed/failed).
     if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
         || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
+      LOG.debug("Job " + resourceName + " is failed or already completed.");
       return emptyAssignment(resourceName, currStateOutput);
     }
 
@@ -190,6 +204,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     boolean isReady =
         scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
     if (!isReady) {
+      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
       return emptyAssignment(resourceName, currStateOutput);
     }
 
@@ -224,6 +239,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
     TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
 
+    LOG.debug("Job " + resourceName + " new assignment " + Arrays
+        .toString(newAssignment.getMappedPartitions().toArray()));
+
     return newAssignment;
   }
 
@@ -529,6 +547,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       // Remove any timers that are past-time for this workflow
       Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
       if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
         SCHEDULED_TIMES.remove(workflowResource);
       }
 
@@ -536,6 +555,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
       if (scheduleConfig.isRecurring()) {
         // Skip scheduling this workflow if it's not in a start state
         if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          LOG.debug(
+              "Skip scheduling since the workflow has not been started " + workflowResource);
           return false;
         }
 
@@ -543,8 +564,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
         if (lastScheduled != null) {
           WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
-          if (lastWorkflowCtx == null
-              || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
             return false;
           }
         }
@@ -559,7 +581,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         // Now clone the workflow if this clone has not yet been created
         String newWorkflowName =
             workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
-        if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) {
+        LOG.debug("Ready to start workflow " + newWorkflowName);
+        if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =
               TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
                   timeToSchedule));
@@ -592,9 +615,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // Do nothing if there is already a timer set for the this workflow with the same start time.
     if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
         || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
+      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
       return;
     }
-    LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource);
+    LOG.info(
+        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
+            + " delay from start: " + delayFromStart);
 
     // For workflows not yet scheduled, schedule them and record it
     RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
@@ -607,6 +633,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     long currentTime = now;
     Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
     if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+      LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
       SCHEDULED_TIMES.remove(jobResource);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 2235b80..bb62de5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -349,6 +349,7 @@ public class TaskUtil {
     IdealState is = accessor.getProperty(key);
     if (is != null) {
       accessor.updateProperty(key, is);
+      LOG.debug("invoke rebalance for " + key);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 4ca6e68..f69605e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -26,8 +26,10 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.helix.task.beans.JobBean;
@@ -192,10 +194,17 @@ public class Workflow {
    */
   public void validate() {
     // validate dag and configs
-    if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
-      throw new IllegalArgumentException("Nodes specified in DAG missing from config");
-    } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) {
-      throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+    Set<String> jobNamesInConfig = new HashSet<String>(_jobConfigs.keySet());
+    Set<String> jobNamesInDag = new HashSet<String>(_workflowConfig.getJobDag().getAllNodes());
+    if (!jobNamesInConfig.equals(jobNamesInDag)) {
+      Set<String> jobNamesInConfigButNotInDag = new HashSet<String>(jobNamesInConfig);
+      jobNamesInConfigButNotInDag.removeAll(jobNamesInDag);
+      Set<String> jobNamesInDagButNotInConfig = new HashSet<String>(jobNamesInDag);
+      jobNamesInDagButNotInConfig.removeAll(jobNamesInConfig);
+
+      throw new IllegalArgumentException(
+          "Job Names dismatch. Names in config but not in dag: " + jobNamesInConfigButNotInDag +
+          ", names in dag but not in config: " + jobNamesInDagButNotInConfig);
     }
 
     _workflowConfig.getJobDag().validate();

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4656a23..deca8a7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.task;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;;
@@ -157,10 +158,31 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
+  private Date getDateFromStartTime(String startTime)
+  {
+    int splitIndex = startTime.indexOf(':');
+    int hourOfDay = 0, minutes = 0;
+    try
+    {
+      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+    }
+    catch (NumberFormatException e)
+    {
+
+    }
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+    cal.set(Calendar.MINUTE, minutes);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTime();
+  }
+
   private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(500000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120));
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
     cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -168,6 +190,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     cal.set(Calendar.MILLISECOND, 0);
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    //cfgMap.put(WorkflowConfig.START_TIME,
+        //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
     return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
   }
 
@@ -186,7 +210,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i <= 2; i++) {
+    for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
@@ -213,7 +237,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     queue = buildRecurrentJobQueue(queueName, 5);
     _driver.createQueue(queue);
     currentJobNames.clear();
-    for (int i = 0; i <= 2; i++) {
+    for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =


[2/3] helix git commit: [HELIX-615] Naming problem of scheduled jobs from recurrent queue.

Posted by ki...@apache.org.
[HELIX-615] Naming problem of scheduled jobs from recurrent queue.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7569a0a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7569a0a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7569a0a7

Branch: refs/heads/helix-0.6.x
Commit: 7569a0a7b999fb6675447919d91e756200364ff5
Parents: d129d3a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:50:30 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:50:30 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskConstants.java   |  4 ----
 .../main/java/org/apache/helix/task/TaskDriver.java | 16 +++++++++-------
 .../java/org/apache/helix/task/TaskRebalancer.java  |  7 +++++--
 .../integration/task/TestRecurringJobQueue.java     |  2 +-
 4 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 34008d6..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -39,8 +39,4 @@ public class TaskConstants {
    * The root property store path at which the {@link TaskRebalancer} stores context information.
    */
   public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
-  /**
-   * Resource prefix for scheduled workflows
-   */
-  public static final String SCHEDULED = "SCHEDULED";
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b4b94f8..cc1eac1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -570,13 +570,15 @@ public class TaskDriver {
   private void setWorkflowTargetState(String workflowName, TargetState state) {
     setSingleWorkflowTargetState(workflowName, state);
 
-    // For recurring schedules, child workflows must also be handled
-    List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
-    String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
-    for (String resource : resources) {
-      if (resource.startsWith(prefix)) {
-        setSingleWorkflowTargetState(resource, state);
-      }
+    // For recurring schedules, last scheduled incomplete workflow must also be handled
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
+    String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+    WorkflowContext lastScheduledWorkflowCtx =
+        TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+    if (lastScheduledWorkflowCtx != null &&
+        !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+          || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+      setSingleWorkflowTargetState(lastScheduledWorkflow, state);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 5a86c3d..3842b66 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,6 +19,8 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -579,8 +581,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         long timeToSchedule = period * offsetMultiplier + startTime.getTime();
 
         // Now clone the workflow if this clone has not yet been created
-        String newWorkflowName =
-            workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
+        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
+        // Now clone the workflow if this clone has not yet been created
+        String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
         LOG.debug("Ready to start workflow " + newWorkflowName);
         if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =

http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index deca8a7..eef1ce6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;;
+import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;


[3/3] helix git commit: [HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.

Posted by ki...@apache.org.
[HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bbb20be
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bbb20be
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bbb20be

Branch: refs/heads/helix-0.6.x
Commit: 7bbb20be67a939a57f33d8f6d7c814b1dc246575
Parents: 7569a0a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:54:34 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:54:34 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobQueue.java    | 66 ++++++++++----------
 .../java/org/apache/helix/task/TaskDriver.java  | 42 ++++++++-----
 .../java/org/apache/helix/task/TaskRunner.java  |  5 +-
 .../org/apache/helix/task/TaskStateModel.java   |  8 +--
 .../java/org/apache/helix/task/Workflow.java    | 55 +++++++++++-----
 .../org/apache/helix/task/WorkflowConfig.java   | 14 +++++
 .../task/TestIndependentTaskRebalancer.java     |  1 -
 .../integration/task/TestRecurringJobQueue.java | 52 ++++++++-------
 .../task/TestTaskRebalancerParallel.java        |  4 +-
 9 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index bca5911..0280c88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -19,31 +19,26 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
  * A named queue to which jobs can be added
  */
-public class JobQueue extends WorkflowConfig {
+public class JobQueue extends Workflow {
   /* Config fields */
   public static final String CAPACITY = "CAPACITY";
 
-  private final String _name;
   private final int _capacity;
 
-  private JobQueue(String name, int capacity, WorkflowConfig config) {
-    super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(),
-        config.isTerminable(), config.getScheduleConfig());
-    _name = name;
+  private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+      Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
+    super(name, workflowConfig, jobConfigs, taskConfigs);
     _capacity = capacity;
-  }
-
-  /**
-   * Get the name of this queue
-   * @return queue name
-   */
-  public String getName() {
-    return _name;
+    validate();
   }
 
   /**
@@ -54,31 +49,24 @@ public class JobQueue extends WorkflowConfig {
     return _capacity;
   }
 
-  @Override
   public Map<String, String> getResourceConfigMap() throws Exception {
-    Map<String, String> cfgMap = super.getResourceConfigMap();
+    Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
     cfgMap.put(CAPACITY, String.valueOf(_capacity));
     return cfgMap;
   }
 
-  /** Supports creation of a single empty queue */
-  public static class Builder {
-    private WorkflowConfig.Builder _builder;
-    private final String _name;
+  /** Supports creation of a single queue */
+  public static class Builder extends Workflow.Builder {
     private int _capacity = Integer.MAX_VALUE;
+    private List<String> jobs;
 
     public Builder(String name) {
-      _builder = new WorkflowConfig.Builder();
-      _name = name;
-    }
-
-    public Builder parallelJobs(int parallelJobs) {
-      _builder.setParallelJobs(parallelJobs);
-      return this;
+      super(name);
+      jobs = new ArrayList<String>();
     }
 
     public Builder expiry(long expiry) {
-      _builder.setExpiry(expiry);
+      _expiry = expiry;
       return this;
     }
 
@@ -87,18 +75,32 @@ public class JobQueue extends WorkflowConfig {
       return this;
     }
 
+    @Override
     public Builder fromMap(Map<String, String> cfg) {
-      _builder = WorkflowConfig.Builder.fromMap(cfg);
+      super.fromMap(cfg);
       if (cfg.containsKey(CAPACITY)) {
         _capacity = Integer.parseInt(cfg.get(CAPACITY));
       }
       return this;
     }
 
+    public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+      if (jobs.size() >= _capacity) {
+        throw new HelixException("Failed to push new job to jobQueue, it is already full");
+      }
+      addJobConfig(job, jobBuilder);
+      if (jobs.size() > 0) {
+        String previousJob = jobs.get(jobs.size() - 1);
+        addParentChildDependency(previousJob, job);
+      }
+      jobs.add(job);
+    }
+
     public JobQueue build() {
-      _builder.setTerminable(false);
-      WorkflowConfig workflowConfig = _builder.build();
-      return new JobQueue(_name, _capacity, workflowConfig);
+      WorkflowConfig.Builder builder = buildWorkflowConfig();
+      builder.setTerminable(false);
+      WorkflowConfig workflowConfig = builder.build();
+      return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cc1eac1..654ba4e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -191,8 +191,8 @@ public class TaskDriver {
     String flowName = flow.getName();
 
     // first, add workflow config to ZK
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow
-        .getWorkflowConfig().getResourceConfigMap());
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+        flow.getWorkflowConfig().getResourceConfigMap());
 
     // then schedule jobs
     for (String job : flow.getJobConfigs().keySet()) {
@@ -206,14 +206,7 @@ public class TaskDriver {
 
   /** Creates a new named job queue (workflow) */
   public void createQueue(JobQueue queue) throws Exception {
-    String queueName = queue.getName();
-    HelixProperty property = new HelixProperty(queueName);
-    property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
-    boolean created =
-        _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property);
-    if (!created) {
-      throw new IllegalArgumentException("Queue " + queueName + " already exists!");
-    }
+    start(queue);
   }
 
   /** Flushes a named job queue */
@@ -566,20 +559,35 @@ public class TaskDriver {
     setWorkflowTargetState(workflow, TargetState.DELETE);
   }
 
-  /** Helper function to change target state for a given workflow */
+  /**
+   * Helper function to change target state for a given workflow
+   */
   private void setWorkflowTargetState(String workflowName, TargetState state) {
     setSingleWorkflowTargetState(workflowName, state);
 
+    // TODO: this is the temporary fix for current task rebalance implementation.
+    // We should fix this in new task framework implementation.
+    List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
+    for (String resource : resources) {
+      if (resource.startsWith(workflowName)) {
+        setSingleWorkflowTargetState(resource, state);
+      }
+    }
+
+    /* TODO: use this code for new task framework.
     // For recurring schedules, last scheduled incomplete workflow must also be handled
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
     String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
-    WorkflowContext lastScheduledWorkflowCtx =
-        TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
-    if (lastScheduledWorkflowCtx != null &&
-        !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
-          || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
-      setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+    if (lastScheduledWorkflow != null) {
+      WorkflowContext lastScheduledWorkflowCtx =
+          TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+      if (lastScheduledWorkflowCtx != null && !(
+          lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+              || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+        setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+      }
     }
+    */
   }
 
   /** Helper function to change target state for a given workflow */

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index abd1882..7b17043 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -48,6 +48,7 @@ public class TaskRunner implements Runnable {
   // If true, indicates that the task has finished.
   private volatile boolean _done = false;
 
+
   public TaskRunner(Task task, String taskName, String taskPartition, String instance,
       HelixManager manager, String sessionId) {
     _task = task;
@@ -111,7 +112,9 @@ public class TaskRunner implements Runnable {
    * Signals the task to cancel itself.
    */
   public void cancel() {
-    _task.cancel();
+    if (!_done) {
+      _task.cancel();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 30939fc..525a38b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -205,22 +205,22 @@ public class TaskStateModel extends StateModel {
 
   @Transition(to = "INIT", from = "COMPLETED")
   public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "STOPPED")
   public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "TIMED_OUT")
   public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "TASK_ERROR")
   public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f69605e..259b72c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -49,19 +49,19 @@ public class Workflow {
   public static final String UNSPECIFIED = "UNSPECIFIED";
 
   /** Workflow name */
-  private String _name;
+  protected String _name;
 
   /** Holds workflow-level configurations */
-  private WorkflowConfig _workflowConfig;
+  protected WorkflowConfig _workflowConfig;
 
   /** Contains the per-job configurations for all jobs specified in the provided dag */
-  private Map<String, Map<String, String>> _jobConfigs;
+  protected Map<String, Map<String, String>> _jobConfigs;
 
   /** Containers the per-job configurations of all individually-specified tasks */
-  private Map<String, List<TaskConfig>> _taskConfigs;
+  protected Map<String, List<TaskConfig>> _taskConfigs;
 
   /** Constructs and validates a workflow against a provided dag and config set */
-  private Workflow(String name, WorkflowConfig workflowConfig,
+  protected Workflow(String name, WorkflowConfig workflowConfig,
       Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
     _name = name;
     _workflowConfig = workflowConfig;
@@ -225,12 +225,14 @@ public class Workflow {
 
   /** Build a workflow incrementally from dependencies and single configs, validate at build time */
   public static class Builder {
-    private String _name;
-    private JobDag _dag;
-    private Map<String, Map<String, String>> _jobConfigs;
-    private Map<String, List<TaskConfig>> _taskConfigs;
-    private ScheduleConfig _scheduleConfig;
-    private long _expiry;
+    protected String _name;
+    protected JobDag _dag;
+    protected Map<String, Map<String, String>> _jobConfigs;
+    protected Map<String, List<TaskConfig>> _taskConfigs;
+    protected ScheduleConfig _scheduleConfig;
+    protected long _expiry;
+    protected Map<String, String> _cfgMap;
+    protected int _parallelJobs = -1;
 
     public Builder(String name) {
       _name = name;
@@ -287,6 +289,11 @@ public class Workflow {
       return this;
     }
 
+    public Builder fromMap(Map<String, String> cfg) {
+      _cfgMap = cfg;
+      return this;
+    }
+
     public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
       _scheduleConfig = scheduleConfig;
       return this;
@@ -301,13 +308,30 @@ public class Workflow {
       return TaskUtil.getNamespacedJobName(_name, job);
     }
 
+    public Builder parallelJobs(int parallelJobs) {
+      _parallelJobs = parallelJobs;
+      return this;
+    }
+
     public Workflow build() {
+      WorkflowConfig.Builder builder = buildWorkflowConfig();
+      // calls validate internally
+      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+    }
+
+    protected WorkflowConfig.Builder buildWorkflowConfig() {
       for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
         _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
       }
 
-      WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+      WorkflowConfig.Builder builder;
+      if (_cfgMap != null) {
+        builder = WorkflowConfig.Builder.fromMap(_cfgMap);
+      } else {
+        builder = new WorkflowConfig.Builder();
+      }
+
       builder.setJobDag(_dag);
       builder.setTargetState(TargetState.START);
       if (_scheduleConfig != null) {
@@ -316,8 +340,11 @@ public class Workflow {
       if (_expiry > 0) {
         builder.setExpiry(_expiry);
       }
-      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
-                                                                              // internally
+      if (_parallelJobs != -1) {
+        builder.setParallelJobs(_parallelJobs);
+      }
+
+      return builder;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index f15f235..56fba58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -100,6 +100,20 @@ public class WorkflowConfig {
     return defaultDateFormat;
   }
 
+  /**
+   * Get the scheduled start time of the workflow.
+   *
+   * @return start time if the workflow schedule is set, null if no schedule config set.
+   */
+  public Date getStartTime() {
+    // Workflow with non-scheduled config is ready to schedule immediately.
+    if (_scheduleConfig == null) {
+      return null;
+    }
+
+    return _scheduleConfig.getStartTime();
+  }
+
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1f17e92..a00a736 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -274,7 +274,6 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
     TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index eef1ce6..38c9113 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -179,7 +179,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     return cal.getTime();
   }
 
-  private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
     cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
@@ -191,11 +191,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
     //cfgMap.put(WorkflowConfig.START_TIME,
-        //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
-    return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
+    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
-  private JobQueue buildRecurrentJobQueue(String jobQueueName) {
+
+  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
 
@@ -205,9 +206,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
-
+    JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i <= 1; i++) {
@@ -218,10 +217,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuild.enqueueJob(jobName, job);
       currentJobNames.add(jobName);
     }
 
+    _driver.start(queueBuild.build());
+
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure job 1 is started before stop it
@@ -234,8 +235,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.delete(queueName);
     Thread.sleep(500);
 
-    queue = buildRecurrentJobQueue(queueName, 5);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -245,10 +245,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuilder.enqueueJob(jobName, job);
       currentJobNames.add(jobName);
     }
 
+    _driver.createQueue(queueBuilder.build());
+
+
     wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure jobs are started and completed
@@ -269,12 +272,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Thread.sleep(100);
     for (int i = 0; i <= 4; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
@@ -285,9 +288,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
       LOG.info("Enqueuing job: " + jobName);
-      _driver.enqueueJob(queueName, jobName, job);
+      queueBuilder.enqueueJob(jobName, job);
       currentJobNames.add(i, jobName);
     }
+    _driver.createQueue(queueBuilder.build());
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
@@ -360,8 +364,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -369,7 +372,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
 
     final int JOB_COUNTS = 3;
-
     for (int i = 0; i < JOB_COUNTS; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
@@ -384,8 +386,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // enqueue all jobs except last one
     for (int i = 0; i < JOB_COUNTS - 1; ++i) {
       LOG.info("Enqueuing job: " + jobNames.get(i));
-      _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i));
+      queueBuilder.enqueueJob(jobNames.get(i), jobs.get(i));
     }
+
+    _driver.createQueue(queueBuilder.build());
+
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
@@ -398,6 +403,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
     _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1));
+    _driver.stop(queueName);
 
     // remove the last job
     _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1));
@@ -413,8 +419,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue queue = buildRecurrentJobQueue(queueName);
-    _driver.createQueue(queue);
+    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
     // create jobs
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
@@ -431,8 +436,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
             .setTargetPartitionStates(Sets.newHashSet("MASTER"));
 
     // enqueue both jobs
-    _driver.enqueueJob(queueName, "job1", job1);
-    _driver.enqueueJob(queueName, "job2", job2);
+    queueBuilder.enqueueJob("job1", job1);
+    queueBuilder.enqueueJob("job2", job2);
+
+    _driver.createQueue(queueBuilder.build());
+
 
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();

http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index f6fc53a..2ff8c56 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -156,7 +156,9 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
 
     String queueName = TestHelper.getTestMethodName();
 
-    JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build();
+    JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
+    queueBuild.parallelJobs(PARALLEL_COUNT);
+    JobQueue queue = queueBuild.build();
     _driver.createQueue(queue);
 
     List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();