You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:07 UTC

[11/33] helix git commit: Refactor Workflow and Jobqueue builders to make the builder API more clean.

Refactor Workflow and Jobqueue builders to make the builder API more clean.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d386aff3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d386aff3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d386aff3

Branch: refs/heads/helix-0.6.x
Commit: d386aff394f2e4e7202f13fe2ed5e6533a8cfb29
Parents: 66dba1f
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 23 17:06:35 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:48:35 2016 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueuesResource.java     |   8 +-
 .../webapp/resources/WorkflowsResource.java     |   4 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |  21 +-
 .../java/org/apache/helix/task/JobConfig.java   | 144 ++++++-------
 .../main/java/org/apache/helix/task/JobDag.java |   5 +-
 .../java/org/apache/helix/task/JobQueue.java    |  98 +++++----
 .../java/org/apache/helix/task/TaskDriver.java  |  36 ++--
 .../java/org/apache/helix/task/TaskUtil.java    |  35 ----
 .../java/org/apache/helix/task/Workflow.java    | 156 ++++++++------
 .../org/apache/helix/task/WorkflowConfig.java   | 207 +++++++++++++++----
 .../apache/helix/task/WorkflowRebalancer.java   |  43 ++--
 .../helix/integration/task/TaskTestUtil.java    |  48 +++--
 .../integration/task/TestRecurringJobQueue.java |  54 ++++-
 .../task/TestTaskRebalancerParallel.java        |  11 +-
 .../integration/task/TestUpdateWorkflow.java    |  99 +++++++--
 .../integration/task/WorkflowGenerator.java     |   9 -
 16 files changed, 605 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
index 1a5cb17..e0a0657 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
@@ -103,9 +103,11 @@ public class JobQueuesResource extends ServerResource {
       Map.Entry<String, HelixProperty> e = it.next();
       HelixProperty resource = e.getValue();
       Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
-      boolean isTerminable = resource.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
-      if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
-          || !simpleFields.containsKey(WorkflowConfig.DAG) || isTerminable) {
+      boolean isTerminable = resource.getRecord()
+          .getBooleanField(WorkflowConfig.WorkflowConfigProperty.Terminable.name(), true);
+      if (!simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.TargetState.name())
+          || !simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())
+          || isTerminable) {
         it.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
index 9175530..c517fea 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -96,8 +96,8 @@ public class WorkflowsResource extends ServerResource {
       Map.Entry<String, HelixProperty> e = it.next();
       HelixProperty resource = e.getValue();
       Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
-      if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
-          || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+      if (!simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.TargetState.name())
+          || !simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
         it.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 6f744f0..855312b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -673,19 +673,21 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     // Create a new workflow with a new name
     HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
     Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+    JobDag jobDag =
+        JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
     Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
     Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
 
     // Set the workflow expiry
-    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+    builder.setExpiry(
+        Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
 
     // Set the schedule, if applicable
     ScheduleConfig scheduleConfig;
     if (newStartTime != null) {
       scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
     } else {
-      scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
+      scheduleConfig = WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
     }
     if (scheduleConfig != null) {
       builder.setScheduleConfig(scheduleConfig);
@@ -699,7 +701,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
         String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
         HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
         Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), newWorkflowName); // overwrite workflow name
+        jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), newWorkflowName); // overwrite workflow name
         for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
           builder.addConfig(job, e.getKey(), e.getValue());
         }
@@ -746,7 +748,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     long currentTime = now;
     Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
     if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-      LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
+      LOG.debug(
+          "Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
       SCHEDULED_TIMES.remove(jobResource);
     }
 
@@ -831,7 +834,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
   private static void markForDeletion(HelixManager mgr, String resourceName) {
     mgr.getConfigAccessor().set(
         TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+        WorkflowConfig.WorkflowConfigProperty.TargetState.name(), TargetState.DELETE.name());
   }
 
   /**
@@ -848,7 +851,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        JobDag jobDag = JobDag
+            .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
         for (String child : jobDag.getDirectChildren(resourceName)) {
           jobDag.getChildrenToParents().get(child).remove(resourceName);
         }
@@ -859,7 +863,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
         jobDag.getParentsToChildren().remove(resourceName);
         jobDag.getAllNodes().remove(resourceName);
         try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          currentData
+              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
           LOG.equals("Could not update DAG for job: " + resourceName);
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 65a9caf..d423d38 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -44,81 +44,71 @@ public class JobConfig {
     /**
      * The name of the workflow to which the job belongs.
      */
-    WORKFLOW_ID("WorkflowID"),
+    WorkflowID,
     /**
      * The assignment strategy of this job
      */
-    ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+    AssignmentStrategy,
     /**
      * The name of the target resource.
      */
-    TARGET_RESOURCE("TargetResource"),
+    TargetResource,
     /**
      * The set of the target partition states. The value must be a comma-separated list of partition
      * states.
      */
-    TARGET_PARTITION_STATES("TargetPartitionStates"),
+    TargetPartitionStates,
     /**
      * The set of the target partition ids. The value must be a comma-separated list of partition ids.
      */
-    TARGET_PARTITIONS("TargetPartitions"),
+    TargetPartitions,
     /**
      * The command that is to be run by participants in the case of identical tasks.
      */
-    COMMAND("Command"),
+    Command,
     /**
      * The command configuration to be used by the tasks.
      */
-    JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+    JobCommandConfig,
     /**
      * The timeout for a task.
      */
-    TIMEOUT_PER_TASK("TimeoutPerPartition"),
+    TimeoutPerPartition,
     /**
      * The maximum number of times the task rebalancer may attempt to execute a task.
      */
-    MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+    MaxAttemptsPerTask,
     /**
      * The maximum number of times Helix will intentionally move a failing task
      */
-    MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+    MaxForcedReassignmentsPerTask,
     /**
      * The number of concurrent tasks that are allowed to run on an instance.
      */
-    NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+    ConcurrentTasksPerInstance,
     /**
      * The number of tasks within the job that are allowed to fail.
      */
-    FAILURE_THRESHOLD("FailureThreshold"),
+    FailureThreshold,
     /**
      * The amount of time in ms to wait before retrying a task
      */
-    TASK_RETRY_DELAY("TaskRetryDelay"),
+    TaskRetryDelay,
 
     /**
      * Whether failure of directly dependent jobs should fail this job.
      */
-    IGNORE_DEPENDENT_JOB_FAILURE("IgnoreDependentJobFailure"),
+    IgnoreDependentJobFailure,
 
     /**
      * The individual task configurations, if any *
      */
-    TASK_CONFIGS("TaskConfigs"),
+    TaskConfigs,
 
     /**
      * Disable external view (not showing) for this job resource
      */
-    DISABLE_EXTERNALVIEW("DisableExternalView");
-
-    private final String _value;
-
-    private JobConfigProperty(String val) {
-      _value = val;
-    }
-
-    public String value() {
-      return _value;
-    }
+    DisableExternalView
   }
 
   //Default property values
@@ -237,40 +227,40 @@ public class JobConfig {
 
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
+    cfgMap.put(JobConfigProperty.WorkflowID.name(), _workflow);
     if (_command != null) {
-      cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
+      cfgMap.put(JobConfigProperty.Command.name(), _command);
     }
     if (_jobCommandConfigMap != null) {
       String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
       if (serializedConfig != null) {
-        cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
+        cfgMap.put(JobConfigProperty.JobCommandConfig.name(), serializedConfig);
       }
     }
     if (_targetResource != null) {
-      cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
+      cfgMap.put(JobConfigProperty.TargetResource.name(), _targetResource);
     }
     if (_targetPartitionStates != null) {
-      cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+      cfgMap.put(JobConfigProperty.TargetPartitionStates.name(),
           Joiner.on(",").join(_targetPartitionStates));
     }
     if (_targetPartitions != null) {
       cfgMap
-          .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
+          .put(JobConfigProperty.TargetPartitions.name(), Joiner.on(",").join(_targetPartitions));
     }
     if (_retryDelay > 0) {
-      cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
+      cfgMap.put(JobConfigProperty.TaskRetryDelay.name(), "" + _retryDelay);
     }
-    cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
-    cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
-    cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+    cfgMap.put(JobConfigProperty.TimeoutPerPartition.name(), "" + _timeoutPerTask);
+    cfgMap.put(JobConfigProperty.MaxAttemptsPerTask.name(), "" + _maxAttemptsPerTask);
+    cfgMap.put(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
         "" + _maxForcedReassignmentsPerTask);
-    cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
-    cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+    cfgMap.put(JobConfigProperty.FailureThreshold.name(), "" + _failureThreshold);
+    cfgMap.put(JobConfigProperty.DisableExternalView.name(),
         Boolean.toString(_disableExternalView));
-    cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+    cfgMap.put(JobConfigProperty.ConcurrentTasksPerInstance.name(),
         "" + _numConcurrentTasksPerInstance);
-    cfgMap.put(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value(),
+    cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
         Boolean.toString(_ignoreDependentJobFailure));
     return cfgMap;
   }
@@ -312,56 +302,56 @@ public class JobConfig {
      */
     public static Builder fromMap(Map<String, String> cfg) {
       Builder b = new Builder();
-      if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
-        b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
+      if (cfg.containsKey(JobConfigProperty.WorkflowID.name())) {
+        b.setWorkflow(cfg.get(JobConfigProperty.WorkflowID.name()));
       }
-      if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
-        b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
+      if (cfg.containsKey(JobConfigProperty.TargetResource.name())) {
+        b.setTargetResource(cfg.get(JobConfigProperty.TargetResource.name()));
       }
-      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
-        b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
+      if (cfg.containsKey(JobConfigProperty.TargetPartitions.name())) {
+        b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TargetPartitions.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+      if (cfg.containsKey(JobConfigProperty.TargetPartitionStates.name())) {
         b.setTargetPartitionStates(new HashSet<String>(
-            Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
+            Arrays.asList(cfg.get(JobConfigProperty.TargetPartitionStates.name()).split(","))));
       }
-      if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
-        b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
+      if (cfg.containsKey(JobConfigProperty.Command.name())) {
+        b.setCommand(cfg.get(JobConfigProperty.Command.name()));
       }
-      if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+      if (cfg.containsKey(JobConfigProperty.JobCommandConfig.name())) {
         Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
-            cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
+            cfg.get(JobConfigProperty.JobCommandConfig.name()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
-      if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
-        b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
+      if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+      if (cfg.containsKey(JobConfigProperty.ConcurrentTasksPerInstance.name())) {
         b.setNumConcurrentTasksPerInstance(
-            Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
+            Integer.parseInt(cfg.get(JobConfigProperty.ConcurrentTasksPerInstance.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+      if (cfg.containsKey(JobConfigProperty.MaxAttemptsPerTask.name())) {
         b.setMaxAttemptsPerTask(
-            Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
+            Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+      if (cfg.containsKey(JobConfigProperty.MaxForcedReassignmentsPerTask.name())) {
         b.setMaxForcedReassignmentsPerTask(
-            Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
+            Integer.parseInt(cfg.get(JobConfigProperty.MaxForcedReassignmentsPerTask.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+      if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) {
         b.setFailureThreshold(
-            Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
+            Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
-        b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
+      if (cfg.containsKey(JobConfigProperty.TaskRetryDelay.name())) {
+        b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TaskRetryDelay.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+      if (cfg.containsKey(JobConfigProperty.DisableExternalView.name())) {
         b.setDisableExternalView(
-            Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
+            Boolean.valueOf(cfg.get(JobConfigProperty.DisableExternalView.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())) {
+      if (cfg.containsKey(JobConfigProperty.IgnoreDependentJobFailure.name())) {
         b.setIgnoreDependentJobFailure(
-            Boolean.valueOf(cfg.get(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())));
+            Boolean.valueOf(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
       }
       return b;
     }
@@ -453,45 +443,45 @@ public class JobConfig {
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
+            String.format("%s cannot be null", JobConfigProperty.TargetResource));
       }
       if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
           .isEmpty()) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
+            String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
       }
       if (_taskConfigMap.isEmpty() && _command == null) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.COMMAND));
+            String.format("%s cannot be null", JobConfigProperty.Command));
       }
       if (_timeoutPerTask < 0) {
         throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+            .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition,
                 _timeoutPerTask));
       }
       if (_numConcurrentTasksPerInstance < 1) {
         throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+            .format("%s has invalid value %s", JobConfigProperty.ConcurrentTasksPerInstance,
                 _numConcurrentTasksPerInstance));
       }
       if (_maxAttemptsPerTask < 1) {
         throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+            .format("%s has invalid value %s", JobConfigProperty.MaxAttemptsPerTask,
                 _maxAttemptsPerTask));
       }
       if (_maxForcedReassignmentsPerTask < 0) {
         throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+            .format("%s has invalid value %s", JobConfigProperty.MaxForcedReassignmentsPerTask,
                 _maxForcedReassignmentsPerTask));
       }
       if (_failureThreshold < 0) {
         throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+            .format("%s has invalid value %s", JobConfigProperty.FailureThreshold,
                 _failureThreshold));
       }
       if (_workflow == null) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
+            String.format("%s cannot be null", JobConfigProperty.WorkflowID));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 73a5e58..32e1ffa 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -155,6 +154,10 @@ public class JobDag {
     }
   }
 
+  public int size() {
+    return _allNodes.size();
+  }
+
   /**
    * Checks that dag contains no cycles and all nodes are reachable.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index 0280c88..c350fee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -29,35 +29,25 @@ import java.util.Map;
  * A named queue to which jobs can be added
  */
 public class JobQueue extends Workflow {
-  /* Config fields */
-  public static final String CAPACITY = "CAPACITY";
 
-  private final int _capacity;
-
-  private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+  private JobQueue(String name, WorkflowConfig workflowConfig,
       Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
     super(name, workflowConfig, jobConfigs, taskConfigs);
-    _capacity = capacity;
-    validate();
   }
 
   /**
    * Determine the number of jobs that this queue can accept before rejecting further jobs
+   * This method is deprecated, please use:
+   * JobQueue.getWorkflowConfig().getCapacity();
    * @return queue capacity
    */
+  @Deprecated
   public int getCapacity() {
-    return _capacity;
-  }
-
-  public Map<String, String> getResourceConfigMap() throws Exception {
-    Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
-    cfgMap.put(CAPACITY, String.valueOf(_capacity));
-    return cfgMap;
+    return _workflowConfig.getCapacity();
   }
 
   /** Supports creation of a single queue */
   public static class Builder extends Workflow.Builder {
-    private int _capacity = Integer.MAX_VALUE;
     private List<String> jobs;
 
     public Builder(String name) {
@@ -65,42 +55,74 @@ public class JobQueue extends Workflow {
       jobs = new ArrayList<String>();
     }
 
-    public Builder expiry(long expiry) {
-      _expiry = expiry;
-      return this;
-    }
-
-    public Builder capacity(int capacity) {
-      _capacity = capacity;
-      return this;
-    }
-
+    /**
+     * Do not use this method, use workflowConfigBuilder.setCapacity() instead.
+     * If you set capacity via this method, the number given here
+     * will override capacity number set from other places.
+     * @param capacity
+     * @return
+     */
     @Override
-    public Builder fromMap(Map<String, String> cfg) {
-      super.fromMap(cfg);
-      if (cfg.containsKey(CAPACITY)) {
-        _capacity = Integer.parseInt(cfg.get(CAPACITY));
-      }
+    public Builder setCapacity(int capacity) {
+      super.setCapacity(capacity);
       return this;
     }
 
-    public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
-      if (jobs.size() >= _capacity) {
-        throw new HelixException("Failed to push new job to jobQueue, it is already full");
+    public Builder enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+      if (_workflowConfigBuilder != null) {
+        if (jobs.size() >= _workflowConfigBuilder.getCapacity()) {
+          throw new HelixException("Failed to push new job to jobQueue, it is already full");
+        }
       }
-      addJobConfig(job, jobBuilder);
+
+      addJob(job, jobBuilder);
       if (jobs.size() > 0) {
         String previousJob = jobs.get(jobs.size() - 1);
         addParentChildDependency(previousJob, job);
       }
       jobs.add(job);
+      return this;
+    }
+
+    /**
+     * Please use setWorkflowConfigMap() instead.
+     * @param workflowCfgMap
+     * @return
+     */
+    @Override
+    public Builder fromMap(Map<String, String> workflowCfgMap) {
+      return setWorkflowConfigMap(workflowCfgMap);
+    }
+
+    @Override
+    public Builder setWorkflowConfigMap(Map<String, String> workflowCfgMap) {
+      super.setWorkflowConfigMap(workflowCfgMap);
+      return this;
+    }
+
+    @Override
+    public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
+      super.setWorkflowConfig(workflowConfig);
+      return this;
+    }
+
+    @Override
+    public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+      super.setScheduleConfig(scheduleConfig);
+      return this;
+    }
+
+    @Override
+    public Builder setExpiry(long expiry) {
+      super.setExpiry(expiry);
+      return this;
     }
 
+    @Override
     public JobQueue build() {
-      WorkflowConfig.Builder builder = buildWorkflowConfig();
-      builder.setTerminable(false);
-      WorkflowConfig workflowConfig = builder.build();
-      return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
+      buildConfig();
+      _workflowConfigBuilder.setTerminable(false);
+      return new JobQueue(_name, _workflowConfigBuilder.build(), _jobConfigs, _taskConfigs);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c3eb8bd..99bcb62 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -45,7 +45,6 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
@@ -292,7 +291,8 @@ public class TaskDriver {
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        JobDag jobDag = JobDag.fromJson(
+            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
         for (String resourceName : toRemove) {
           for (String child : jobDag.getDirectChildren(resourceName)) {
             jobDag.getChildrenToParents().get(child).remove(resourceName);
@@ -305,7 +305,8 @@ public class TaskDriver {
           jobDag.getAllNodes().remove(resourceName);
         }
         try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          currentData
+              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
           throw new IllegalArgumentException(e);
         }
@@ -432,7 +433,8 @@ public class TaskDriver {
           return null;
         }
         // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        JobDag jobDag = JobDag.fromJson(
+            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
         Set<String> allNodes = jobDag.getAllNodes();
         if (!allNodes.contains(namespacedJobName)) {
           LOG.warn(
@@ -458,7 +460,8 @@ public class TaskDriver {
 
         // Save the updated DAG
         try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          currentData
+              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
           throw new IllegalStateException(
               "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
@@ -509,18 +512,16 @@ public class TaskDriver {
   public void enqueueJob(final String queueName, final String jobName,
       JobConfig.Builder jobBuilder) {
     // Get the job queue config and capacity
-    HelixProperty workflowConfig =
-        _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
     if (workflowConfig == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
     }
-    boolean isTerminable =
-        workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+    boolean isTerminable = workflowConfig.isTerminable();
     if (isTerminable) {
       throw new IllegalArgumentException(queueName + " is not a queue!");
     }
-    final int capacity =
-        workflowConfig.getRecord().getIntField(JobQueue.CAPACITY, Integer.MAX_VALUE);
+
+    final int capacity = workflowConfig.getCapacity();
 
     // Create the job to ensure that it validates
     JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
@@ -535,9 +536,10 @@ public class TaskDriver {
       @Override
       public ZNRecord update(ZNRecord currentData) {
         // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        JobDag jobDag = JobDag.fromJson(
+            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
         Set<String> allNodes = jobDag.getAllNodes();
-        if (allNodes.size() >= capacity) {
+        if (capacity > 0 && allNodes.size() >= capacity) {
           throw new IllegalStateException(
               "Queue " + queueName + " is at capacity, will not add " + jobName);
         }
@@ -561,7 +563,8 @@ public class TaskDriver {
 
         // Save the updated DAG
         try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          currentData
+              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
           throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
               e);
@@ -689,7 +692,8 @@ public class TaskDriver {
           // Only update target state for non-completed workflows
           String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
           if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
-            currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+            currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+                state.name());
           } else {
             LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 49622f3..513c14e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,14 +20,9 @@ package org.apache.helix.task;
  */
 
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -36,7 +31,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -46,7 +40,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -319,34 +312,6 @@ public class TaskUtil {
     }
   }
 
-  /**
-   * Get a ScheduleConfig from a workflow config string map
-   *
-   * @param cfg the string map
-   * @return a ScheduleConfig if one exists, otherwise null
-   */
-  public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
-    // Parse schedule-specific configs, if they exist
-    Date startTime = null;
-    if (cfg.containsKey(WorkflowConfig.START_TIME)) {
-      try {
-        startTime = WorkflowConfig.getDefaultDateFormat().parse(cfg.get(WorkflowConfig.START_TIME));
-      } catch (ParseException e) {
-        LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e);
-        return null;
-      }
-    }
-    if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) && cfg
-        .containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
-      return ScheduleConfig
-          .recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
-              Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
-    } else if (startTime != null) {
-      return ScheduleConfig.oneTimeDelayedStart(startTime);
-    }
-    return null;
-  }
-
   private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 1706bec..f3abc2e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.task.beans.JobBean;
 import org.apache.helix.task.beans.TaskBean;
 import org.apache.helix.task.beans.WorkflowBean;
@@ -87,6 +88,14 @@ public class Workflow {
   }
 
   /**
+   * @return Resource configuration key/value map.
+   * @throws HelixException
+   */
+  public Map<String, String> getResourceConfigMap() throws HelixException {
+    return _workflowConfig.getResourceConfigMap();
+  }
+
+  /**
    * Parses the YAML description from a file into a {@link Workflow} object.
    * @param file An abstract path name to the file containing the workflow description.
    * @return A {@link Workflow} object.
@@ -134,7 +143,7 @@ public class Workflow {
   private static Workflow parse(Reader reader) throws Exception {
     Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
-    Builder builder = new Builder(wf.name);
+    Builder workflowBuilder = new Builder(wf.name);
 
     if (wf != null && wf.jobs != null) {
       for (JobBean job : wf.jobs) {
@@ -144,53 +153,55 @@ public class Workflow {
 
         if (job.parents != null) {
           for (String parent : job.parents) {
-            builder.addParentChildDependency(parent, job.name);
+            workflowBuilder.addParentChildDependency(parent, job.name);
           }
         }
 
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.WorkflowID.name(), wf.name);
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.Command.name(), job.command);
         if (job.jobConfigMap != null) {
-          builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
+          workflowBuilder.addJobCommandConfigMap(job.name, job.jobConfigMap);
         }
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetResource.name(),
             job.targetResource);
         if (job.targetPartitionStates != null) {
-          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
+          workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitionStates.name(),
               Joiner.on(",").join(job.targetPartitionStates));
         }
         if (job.targetPartitions != null) {
-          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
+          workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitions.name(),
               Joiner.on(",").join(job.targetPartitions));
         }
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.MaxAttemptsPerTask.name(),
             String.valueOf(job.maxAttemptsPerTask));
-        builder.addConfig(job.name,
-            JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+        workflowBuilder.addConfig(job.name,
+            JobConfig.JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
             String.valueOf(job.maxForcedReassignmentsPerTask));
-        builder.addConfig(job.name,
-            JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+        workflowBuilder.addConfig(job.name,
+            JobConfig.JobConfigProperty.ConcurrentTasksPerInstance.name(),
             String.valueOf(job.numConcurrentTasksPerInstance));
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TimeoutPerPartition.name(),
             String.valueOf(job.timeoutPerPartition));
-        builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
+        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.FailureThreshold.name(),
             String.valueOf(job.failureThreshold));
         if (job.tasks != null) {
           List<TaskConfig> taskConfigs = Lists.newArrayList();
           for (TaskBean task : job.tasks) {
             taskConfigs.add(TaskConfig.from(task));
           }
-          builder.addTaskConfigs(job.name, taskConfigs);
+          workflowBuilder.addTaskConfigs(job.name, taskConfigs);
         }
       }
     }
 
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
     if (wf.schedule != null) {
-      builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
+      workflowCfgBuilder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
     }
-    builder.setExpiry(wf.expiry);
+    workflowCfgBuilder.setExpiry(wf.expiry);
+    workflowBuilder.setWorkflowConfig(workflowCfgBuilder.build());
 
-    return builder.build();
+    return workflowBuilder.build();
   }
 
   /**
@@ -212,6 +223,13 @@ public class Workflow {
           ", names in dag but not in config: " + jobNamesInDagButNotInConfig);
     }
 
+    int capacity = _workflowConfig.getCapacity();
+    int dagSize = _workflowConfig.getJobDag().size();
+    if (capacity > 0 && dagSize > capacity) {
+      throw new IllegalArgumentException(String.format(
+          "Failed to build workflow %s, number of jobs are more than its capacity! capacity(%d), jobs(%d)",
+          _name, capacity, dagSize));
+    }
     _workflowConfig.getJobDag().validate();
 
     for (String node : _jobConfigs.keySet()) {
@@ -234,17 +252,13 @@ public class Workflow {
     protected JobDag _dag;
     protected Map<String, Map<String, String>> _jobConfigs;
     protected Map<String, List<TaskConfig>> _taskConfigs;
-    protected ScheduleConfig _scheduleConfig;
-    protected long _expiry = -1;
-    protected Map<String, String> _cfgMap;
-    protected int _parallelJobs = -1;
+    protected WorkflowConfig.Builder _workflowConfigBuilder;
 
     public Builder(String name) {
       _name = name;
       _dag = new JobDag();
       _jobConfigs = new TreeMap<String, Map<String, String>>();
       _taskConfigs = new TreeMap<String, List<TaskConfig>>();
-      _expiry = -1;
     }
 
     protected Builder addConfig(String job, String key, String val) {
@@ -258,11 +272,22 @@ public class Workflow {
     }
 
     private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
-      return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
+      return addConfig(job, JobConfig.JobConfigProperty.JobCommandConfig.name(),
           TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
     }
 
+    /**
+     * Please use addJob() instead.
+     * @param job
+     * @param jobConfigBuilder
+     * @return
+     */
+    @Deprecated
     public Builder addJobConfig(String job, JobConfig.Builder jobConfigBuilder) {
+      return addJob(job, jobConfigBuilder);
+    }
+
+    public Builder addJob(String job, JobConfig.Builder jobConfigBuilder) {
       JobConfig jobConfig = jobConfigBuilder.setWorkflow(_name).build();
       for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
         String key = e.getKey();
@@ -294,62 +319,73 @@ public class Workflow {
       return this;
     }
 
-    public Builder fromMap(Map<String, String> cfg) {
-      _cfgMap = cfg;
+    /**
+     * Please use setWorkflowConfigMap() instead.
+     * @param workflowCfgMap
+     * @return
+     */
+    public Builder fromMap(Map<String, String> workflowCfgMap) {
+      return setWorkflowConfigMap(workflowCfgMap);
+    }
+
+    public Builder setWorkflowConfigMap(Map<String, String> workflowCfgMap) {
+      if (workflowCfgMap != null && !workflowCfgMap.isEmpty()) {
+        if (_workflowConfigBuilder == null) {
+          _workflowConfigBuilder = WorkflowConfig.Builder.fromMap(workflowCfgMap);
+        } else {
+          _workflowConfigBuilder.setConfigMap(workflowCfgMap);
+        }
+      }
+      return this;
+    }
+
+    public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
+      _workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
       return this;
     }
 
     public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
-      _scheduleConfig = scheduleConfig;
+      if (_workflowConfigBuilder == null) {
+        _workflowConfigBuilder = new WorkflowConfig.Builder();
+      }
+      _workflowConfigBuilder.setScheduleConfig(scheduleConfig);
       return this;
     }
 
     public Builder setExpiry(long expiry) {
-      _expiry = expiry;
+      if (_workflowConfigBuilder == null) {
+        _workflowConfigBuilder = new WorkflowConfig.Builder();
+      }
+      _workflowConfigBuilder.setExpiry(expiry);
       return this;
     }
 
-    public String namespacify(String job) {
-      return TaskUtil.getNamespacedJobName(_name, job);
+    @Deprecated
+    public Builder setCapacity(int capacity) {
+      if (_workflowConfigBuilder == null) {
+        _workflowConfigBuilder = new WorkflowConfig.Builder();
+      }
+      _workflowConfigBuilder.setCapacity(capacity);
+      return this;
     }
 
-    public Builder parallelJobs(int parallelJobs) {
-      _parallelJobs = parallelJobs;
-      return this;
+    public String namespacify(String job) {
+      return TaskUtil.getNamespacedJobName(_name, job);
     }
 
     public Workflow build() {
-      WorkflowConfig.Builder builder = buildWorkflowConfig();
-      // calls validate internally
-      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+      buildConfig();
+      return new Workflow(_name, _workflowConfigBuilder.build(), _jobConfigs, _taskConfigs);
     }
 
-    protected WorkflowConfig.Builder buildWorkflowConfig() {
+    protected void buildConfig() {
       for (String task : _jobConfigs.keySet()) {
-        // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
-        _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
-      }
-
-      WorkflowConfig.Builder builder;
-      if (_cfgMap != null) {
-        builder = WorkflowConfig.Builder.fromMap(_cfgMap);
-      } else {
-        builder = new WorkflowConfig.Builder();
+        _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WorkflowID.name(), _name);
       }
-
-      builder.setJobDag(_dag);
-      builder.setTargetState(TargetState.START);
-      if (_scheduleConfig != null) {
-        builder.setScheduleConfig(_scheduleConfig);
+      if (_workflowConfigBuilder == null) {
+        _workflowConfigBuilder = new WorkflowConfig.Builder();
       }
-      if (_expiry > 0) {
-        builder.setExpiry(_expiry);
-      }
-      if (_parallelJobs > 0) {
-        builder.setParallelJobs(_parallelJobs);
-      }
-
-      return builder;
+      _workflowConfigBuilder.setJobDag(_dag);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 955cb77..db9fdba 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 import java.io.IOException;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
@@ -26,21 +27,34 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
 
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
  */
 public class WorkflowConfig {
-  /* Config fields */
-  public static final String DAG = "Dag";
-  public static final String PARALLEL_JOBS = "ParallelJobs";
-  public static final String TARGET_STATE = "TargetState";
-  public static final String EXPIRY = "Expiry";
-  public static final String START_TIME = "StartTime";
-  public static final String RECURRENCE_UNIT = "RecurrenceUnit";
-  public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
-  public static final String TERMINABLE = "Terminable";
-  public static final String FAILURE_THRESHOLD = "FailureThreshold";
+  private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
+
+  /**
+   * Do not use these values directly, always use the getters/setters
+   * in WorkflowConfig and WorkflowConfig.Builder.
+   *
+   * For back-compatible, this class will be left for public for a while,
+   * but it will be change to protected in future major release.
+   */
+  public enum WorkflowConfigProperty {
+    Dag,
+    ParallelJobs,
+    TargetState,
+    Expiry,
+    StartTime,
+    RecurrenceUnit,
+    RecurrenceInterval,
+    Terminable,
+    FailureThreshold,
+    /* this is for non-terminable workflow. */
+    capacity
+  }
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
@@ -59,9 +73,10 @@ public class WorkflowConfig {
   private final boolean _terminable;
   private final ScheduleConfig _scheduleConfig;
   private final int _failureThreshold;
+  private final int _capacity;
 
   protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long expiry,
-      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig) {
+      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity) {
     _jobDag = jobDag;
     _parallelJobs = parallelJobs;
     _targetState = targetState;
@@ -69,6 +84,7 @@ public class WorkflowConfig {
     _failureThreshold = failureThreshold;
     _terminable = terminable;
     _scheduleConfig = scheduleConfig;
+    _capacity = capacity;
   }
 
   public JobDag getJobDag() {
@@ -91,6 +107,13 @@ public class WorkflowConfig {
     return _failureThreshold;
   }
 
+  /**
+   * Determine the number of jobs that this workflow can accept before rejecting further jobs,
+   * this field is only used when a workflow is not terminable.
+   * @return queue capacity
+   */
+  public int getCapacity() { return _capacity; }
+
   public boolean isTerminable() {
     return _terminable;
   }
@@ -128,15 +151,20 @@ public class WorkflowConfig {
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
     try {
-      cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+      cfgMap.put(WorkflowConfigProperty.Dag.name(), getJobDag().toJson());
     } catch (IOException ex) {
       throw new HelixException("Invalid job dag configuration!", ex);
     }
-    cfgMap.put(WorkflowConfig.PARALLEL_JOBS, String.valueOf(getParallelJobs()));
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
-    cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
-    cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
-    cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(getFailureThreshold()));
+    cfgMap.put(WorkflowConfigProperty.ParallelJobs.name(), String.valueOf(getParallelJobs()));
+    cfgMap.put(WorkflowConfigProperty.Expiry.name(), String.valueOf(getExpiry()));
+    cfgMap.put(WorkflowConfigProperty.TargetState.name(), getTargetState().name());
+    cfgMap.put(WorkflowConfigProperty.Terminable.name(), String.valueOf(isTerminable()));
+    cfgMap.put(WorkflowConfigProperty.FailureThreshold.name(),
+        String.valueOf(getFailureThreshold()));
+
+    if (_capacity > 0) {
+      cfgMap.put(WorkflowConfigProperty.capacity.name(), String.valueOf(_capacity));
+    }
 
     // Populate schedule if present
     ScheduleConfig scheduleConfig = getScheduleConfig();
@@ -144,17 +172,50 @@ public class WorkflowConfig {
       Date startTime = scheduleConfig.getStartTime();
       if (startTime != null) {
         String formattedTime = WorkflowConfig.getDefaultDateFormat().format(startTime);
-        cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
+        cfgMap.put(WorkflowConfigProperty.StartTime.name(), formattedTime);
       }
       if (scheduleConfig.isRecurring()) {
-        cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
-        cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
-            .toString());
+        cfgMap.put(WorkflowConfigProperty.RecurrenceUnit.name(),
+            scheduleConfig.getRecurrenceUnit().toString());
+        cfgMap.put(WorkflowConfigProperty.RecurrenceInterval.name(),
+            scheduleConfig.getRecurrenceInterval().toString());
       }
     }
     return cfgMap;
   }
 
+  /**
+   * Get a ScheduleConfig from a workflow config string map
+   *
+   * @param cfg the string map
+   * @return a ScheduleConfig if one exists, otherwise null
+   */
+  public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
+    // Parse schedule-specific configs, if they exist
+    Date startTime = null;
+    if (cfg.containsKey(WorkflowConfigProperty.StartTime.name())) {
+      try {
+        startTime = WorkflowConfig.getDefaultDateFormat()
+            .parse(cfg.get(WorkflowConfigProperty.StartTime.name()));
+      } catch (ParseException e) {
+        LOG.error(
+            "Unparseable date " + cfg.get(WorkflowConfigProperty.StartTime.name()),
+            e);
+        return null;
+      }
+    }
+    if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name()) && cfg
+        .containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) {
+      return ScheduleConfig.recurringFromDate(startTime,
+          TimeUnit.valueOf(cfg.get(WorkflowConfigProperty.RecurrenceUnit.name())),
+          Long.parseLong(
+              cfg.get(WorkflowConfigProperty.RecurrenceInterval.name())));
+    } else if (startTime != null) {
+      return ScheduleConfig.oneTimeDelayedStart(startTime);
+    }
+    return null;
+  }
+
   public static class Builder {
     private JobDag _taskDag = JobDag.EMPTY_DAG;
     private int _parallelJobs = 1;
@@ -162,13 +223,14 @@ public class WorkflowConfig {
     private long _expiry = DEFAULT_EXPIRY;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
     private boolean _isTerminable = true;
+    private int _capacity = Integer.MAX_VALUE;
     private ScheduleConfig _scheduleConfig;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
-          _isTerminable, _scheduleConfig);
+          _isTerminable, _scheduleConfig, _capacity);
     }
 
     public Builder() {}
@@ -180,9 +242,11 @@ public class WorkflowConfig {
       _expiry = workflowConfig.getExpiry();
       _isTerminable = workflowConfig.isTerminable();
       _scheduleConfig = workflowConfig.getScheduleConfig();
+      _capacity = workflowConfig.getCapacity();
+      _failureThreshold = workflowConfig.getFailureThreshold();
     }
 
-    public Builder setJobDag(JobDag v) {
+    protected Builder setJobDag(JobDag v) {
       _taskDag = v;
       return this;
     }
@@ -207,6 +271,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setCapacity(int capacity) {
+      _capacity = capacity;
+      return this;
+    }
+
     public Builder setTerminable(boolean isTerminable) {
       _isTerminable = isTerminable;
       return this;
@@ -223,43 +292,95 @@ public class WorkflowConfig {
     }
 
     public static Builder fromMap(Map<String, String> cfg) {
-      Builder b = new Builder();
-      if (cfg.containsKey(EXPIRY)) {
-        b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
+      Builder builder = new Builder();
+      builder.setConfigMap(cfg);
+      return builder;
+    }
+
+    public Builder setConfigMap(Map<String, String> cfg) {
+      if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
+        setExpiry(Long.parseLong(cfg.get(WorkflowConfigProperty.Expiry.name())));
       }
-      if (cfg.containsKey(FAILURE_THRESHOLD)) {
-        b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+      if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
+        setFailureThreshold(
+            Integer.parseInt(cfg.get(WorkflowConfigProperty.FailureThreshold.name())));
       }
-      if (cfg.containsKey(DAG)) {
-        b.setJobDag(JobDag.fromJson(cfg.get(DAG)));
+      if (cfg.containsKey(WorkflowConfigProperty.Dag.name())) {
+        setJobDag(JobDag.fromJson(cfg.get(WorkflowConfigProperty.Dag.name())));
       }
-      if (cfg.containsKey(TARGET_STATE)) {
-        b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
+      if (cfg.containsKey(WorkflowConfigProperty.TargetState.name())) {
+        setTargetState(TargetState.valueOf(cfg.get(WorkflowConfigProperty.TargetState.name())));
       }
-      if (cfg.containsKey(TERMINABLE)) {
-        b.setTerminable(Boolean.parseBoolean(cfg.get(TERMINABLE)));
+      if (cfg.containsKey(WorkflowConfigProperty.Terminable.name())) {
+        setTerminable(Boolean.parseBoolean(cfg.get(WorkflowConfigProperty.Terminable.name())));
       }
-      if (cfg.containsKey(PARALLEL_JOBS)) {
-        String value = cfg.get(PARALLEL_JOBS);
+      if (cfg.containsKey(WorkflowConfigProperty.ParallelJobs.name())) {
+        String value = cfg.get(WorkflowConfigProperty.ParallelJobs.name());
         if (value == null) {
-          b.setParallelJobs(1);
+          setParallelJobs(1);
         } else {
-          b.setParallelJobs(Integer.parseInt(value));
+          setParallelJobs(Integer.parseInt(value));
+        }
+      }
+
+      if (cfg.containsKey(WorkflowConfigProperty.capacity.name())) {
+        int capacity = Integer.valueOf(cfg.get(WorkflowConfigProperty.capacity.name()));
+        if (capacity > 0) {
+          setCapacity(capacity);
+        }
+      }
+
+      if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
+        int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
+        if (threshold >= 0) {
+          setFailureThreshold(threshold);
         }
       }
 
       // Parse schedule-specific configs, if they exist
-      ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);
+      ScheduleConfig scheduleConfig = parseScheduleFromConfigMap(cfg);
       if (scheduleConfig != null) {
-        b.setScheduleConfig(scheduleConfig);
+        setScheduleConfig(scheduleConfig);
       }
-      return b;
+      return this;
+    }
+
+    public int getParallelJobs() {
+      return _parallelJobs;
+    }
+
+    public TargetState getTargetState() {
+      return _targetState;
+    }
+
+    public long getExpiry() {
+      return _expiry;
+    }
+
+    public int getFailureThreshold() {
+      return _failureThreshold;
+    }
+
+    public boolean isTerminable() {
+      return _isTerminable;
+    }
+
+    public int getCapacity() {
+      return _capacity;
+    }
+
+    public ScheduleConfig getScheduleConfig() {
+      return _scheduleConfig;
+    }
+
+    public JobDag getJobDag() {
+      return _taskDag;
     }
 
     private void validate() {
       if (_expiry < 0) {
-        throw new IllegalArgumentException(
-            String.format("%s has invalid value %s", EXPIRY, _expiry));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", WorkflowConfigProperty.Expiry.name(), _expiry));
       } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) {
         throw new IllegalArgumentException(
             "Scheduler configuration is invalid. The configuration must have a start time if it is "

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 682ac77..9d1106a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -75,11 +75,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     if (targetState == TargetState.STOP) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
-      // Workflow has been stopped if all jobs are stopped
-      // TODO: what should we do if workflowCtx is not set yet?
-      if (workflowCtx != null && isWorkflowStopped(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -339,25 +334,24 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     // Create a new workflow with a new name
-    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
-    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
-
-    // Set the workflow expiry
-    workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+    Map<String, String> workflowConfigsMap =
+        resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
+    WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
 
     // Set the schedule, if applicable
-    ScheduleConfig scheduleConfig;
     if (newStartTime != null) {
-      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-    } else {
-      scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
-    }
-    if (scheduleConfig != null) {
-      workflowBuilder.setScheduleConfig(scheduleConfig);
+      ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+      workflowConfigBlder.setScheduleConfig(scheduleConfig);
     }
+    workflowConfigBlder.setTerminable(true);
+
+    WorkflowConfig workflowConfig = workflowConfigBlder.build();
+
+    JobDag jobDag = workflowConfig.getJobDag();
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
+    workflowBuilder.setWorkflowConfig(workflowConfig);
 
     // Add each job back as long as the original exists
     Set<String> namespacedJobs = jobDag.getAllNodes();
@@ -454,10 +448,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // Remove DAG references in workflow
     PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
     DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
+      @Override public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
-          JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+          JobDag jobDag = JobDag.fromJson(
+              currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
           for (String child : jobDag.getDirectChildren(job)) {
             jobDag.getChildrenToParents().get(child).remove(job);
           }
@@ -468,7 +462,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
           jobDag.getParentsToChildren().remove(job);
           jobDag.getAllNodes().remove(job);
           try {
-            currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+            currentData
+                .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
           } catch (Exception e) {
             LOG.error("Could not update DAG for job: " + job, e);
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 422ec88..3e5385c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -22,20 +22,19 @@ import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.helix.HelixManager;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.testng.Assert;
@@ -121,8 +120,7 @@ public class TaskTestUtil {
     final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
     boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
 
-      @Override
-      public boolean verify() throws Exception {
+      @Override public boolean verify() throws Exception {
         WorkflowContext ctx = driver.getWorkflowContext(workflowName);
         return ctx == null || ctx.getJobState(namespacedJobName) == null;
       }
@@ -242,36 +240,46 @@ public class TaskTestUtil {
 
   public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
       int recurrenInSeconds) {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(recurrenInSeconds));
-    cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
+    return buildRecurrentJobQueue(jobQueueName, delayStart, recurrenInSeconds, null);
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
+      int recurrenInSeconds, TargetState targetState) {
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
+    workflowCfgBuilder.setExpiry(120000);
+    if (targetState != null) {
+      workflowCfgBuilder.setTargetState(TargetState.STOP);
+    }
+
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
     cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
     cal.set(Calendar.MILLISECOND, 0);
-    cfgMap.put(WorkflowConfig.START_TIME,
-        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
-    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+    ScheduleConfig scheduleConfig =
+        ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenInSeconds);
+    workflowCfgBuilder.setScheduleConfig(scheduleConfig);
+    return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
   }
 
   public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
 
-  public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, int failureThreshold) {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
+      int failureThreshold) {
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
+    workflowCfgBuilder.setExpiry(120000);
+
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
     cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
     cal.set(Calendar.MILLISECOND, 0);
-    cfgMap.put(WorkflowConfig.START_TIME,
-        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    workflowCfgBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(cal.getTime()));
+
     if (failureThreshold > 0) {
-      cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(failureThreshold));
+      workflowCfgBuilder.setFailureThreshold(failureThreshold);
     }
-    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+    return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
   }
 
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index ae3d52d..b2e61ca 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -40,14 +40,13 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
@@ -237,11 +236,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     for (int i = 0; i <= 4; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-              .setJobCommandConfigMap(commandConfig)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+          .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+          .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
       LOG.info("Enqueuing job: " + jobName);
       queueBuilder.enqueueJob(jobName, job);
@@ -331,10 +328,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     for (int i = 0; i < JOB_COUNTS; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(commandConfig)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+          .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+          .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       jobs.add(job);
       jobNames.add(targetPartition.toLowerCase() + "Job" + i);
     }
@@ -370,6 +366,42 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
   }
 
   @Test
+  public void testCreateStoppedQueue() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000,
+        TargetState.STOP);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i <= 1; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      queueBuild.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.createQueue(queueBuild.build());
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+    _driver.resume(queueName);
+
+    //TaskTestUtil.pollForWorkflowState(_driver, queueName, );
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+
+    // ensure current schedule is started
+    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+  }
+
+  @Test
   public void testGetNoExistWorkflowConfig() {
     String randomName = "randomJob";
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 580f5ac..b091748 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -41,6 +41,7 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -136,14 +137,16 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
     }
   }
 
-  @Test
-  public void test() throws Exception {
+  @Test public void test() throws Exception {
     final int PARALLEL_COUNT = 2;
 
     String queueName = TestHelper.getTestMethodName();
 
-    JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
-    queueBuild.parallelJobs(PARALLEL_COUNT);
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder();
+    cfgBuilder.setParallelJobs(PARALLEL_COUNT);
+
+    JobQueue.Builder queueBuild =
+        new JobQueue.Builder(queueName).setWorkflowConfig(cfgBuilder.build());
     JobQueue queue = queueBuild.build();
     _driver.createQueue(queue);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index 964f9e1..2e53b36 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -34,6 +34,7 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
@@ -50,10 +51,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -63,7 +62,6 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
   private static final String TIMEOUT_CONFIG = "Timeout";
-  private static final String TGT_DB = "TestDB";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -92,8 +90,9 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
     }
 
     // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
-    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@@ -151,27 +150,13 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
   }
 
   @Test
-  public void testUpdateQueueConfig() throws InterruptedException {
+  public void testUpdateRunningQueue() throws InterruptedException {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
-    // Create and Enqueue jobs
-    List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i <= 1; i++) {
-      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
-      JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
-      String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuild.enqueueJob(jobName, jobConfig);
-      currentJobNames.add(jobName);
-    }
-
-    _driver.start(queueBuild.build());
+    JobQueue queue = createDefaultRecurrentJobQueue(queueName, 2);
+    _driver.start(queue);
 
     WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
 
@@ -209,5 +194,75 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
             startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
             startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
   }
+
+  @Test
+  public void testUpdateStoppedQueue() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = createDefaultRecurrentJobQueue(queueName, 2);
+    _driver.start(queue);
+
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+
+    // ensure current schedule is started
+    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS);
+
+    _driver.stop(queueName);
+
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+    WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig);
+    Calendar startTime = Calendar.getInstance();
+    startTime.set(Calendar.SECOND, startTime.get(Calendar.SECOND) + 1);
+
+    ScheduleConfig scheduleConfig =
+        ScheduleConfig.recurringFromDate(startTime.getTime(), TimeUnit.MINUTES, 2);
+
+    configBuilder.setScheduleConfig(scheduleConfig);
+
+    _driver.updateWorkflow(queueName, configBuilder.build());
+
+    workflowConfig = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+    _driver.resume(queueName);
+
+    // ensure current schedule is completed
+    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+
+    Thread.sleep(1000);
+
+    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    WorkflowConfig wCfg = _driver.getWorkflowConfig(scheduledQueue);
+
+    Calendar configStartTime = Calendar.getInstance();
+    configStartTime.setTime(wCfg.getStartTime());
+
+    Assert.assertTrue(
+        (startTime.get(Calendar.HOUR_OF_DAY) == configStartTime.get(Calendar.HOUR_OF_DAY) &&
+            startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
+            startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
+  }
+
+  private JobQueue createDefaultRecurrentJobQueue(String queueName, int numJobs) {
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
+    for (int i = 0; i <= numJobs; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      queueBuild.enqueueJob(jobName, jobConfig);
+    }
+
+    return queueBuild.build();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index ce3a36a..639cdff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -31,8 +31,6 @@ import org.apache.log4j.Logger;
  * Convenience class for generating various test workflows
  */
 public class WorkflowGenerator {
-  private static final Logger LOG = Logger.getLogger(WorkflowGenerator.class);
-
   public static final String DEFAULT_TGT_DB = "TestDB";
   public static final String JOB_NAME_1 = "SomeJob1";
   public static final String JOB_NAME_2 = "SomeJob2";
@@ -54,13 +52,6 @@ public class WorkflowGenerator {
     DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
-  static {
-    JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
-    builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
-    DEFAULT_JOB_BUILDER = builder;
-  }
-
   public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);