You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/06/16 23:36:11 UTC

[2/4] helix git commit: [HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.

[HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c490fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c490fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c490fa

Branch: refs/heads/helix-0.6.x
Commit: 79c490fab080494b68a1f52845c1e708b8881439
Parents: 2409601
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 15:59:37 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/model/ResourceConfig.java  |  61 +++++
 .../java/org/apache/helix/task/JobConfig.java   | 258 +++++++++++--------
 .../java/org/apache/helix/task/TaskDriver.java  |   1 +
 .../java/org/apache/helix/task/TaskUtil.java    |  21 +-
 .../java/org/apache/helix/task/Workflow.java    |  37 +--
 .../task/TestIndependentTaskRebalancer.java     |  93 +++----
 .../integration/task/TestRecurringJobQueue.java |   4 +-
 .../integration/task/TestTaskRebalancer.java    |  39 +--
 .../task/TestTaskRebalancerRetryLimit.java      |  18 +-
 .../task/TestTaskRebalancerStopResume.java      |  10 +-
 .../integration/task/WorkflowGenerator.java     |  56 ++--
 11 files changed, 354 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 98433f5..d58126d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -23,6 +23,9 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
 
+import java.util.Collections;
+import java.util.Map;
+
 /**
  * Resource configurations
  */
@@ -73,6 +76,64 @@ public class ResourceConfig extends HelixProperty {
         .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
   }
 
+  /**
+   * Put a set of simple configs.
+   *
+   * @param configsMap
+   */
+  public void putSimpleConfigs(Map<String, String> configsMap) {
+    getRecord().getSimpleFields().putAll(configsMap);
+  }
+
+  /**
+   * Get all simple configurations.
+   *
+   * @return all simple configurations.
+   */
+  public Map<String, String> getSimpleConfigs() {
+    return Collections.unmodifiableMap(getRecord().getSimpleFields());
+  }
+
+  /**
+   * Put a single simple config value.
+   *
+   * @param configKey
+   * @param configVal
+   */
+  public void putSimpleConfig(String configKey, String configVal) {
+    getRecord().getSimpleFields().put(configKey, configVal);
+  }
+
+  /**
+   * Get a single simple config value.
+   *
+   * @param configKey
+   * @return configuration value, or NULL if not exist.
+   */
+  public String getSimpleConfig(String configKey) {
+    return getRecord().getSimpleFields().get(configKey);
+  }
+
+  /**
+   * Put a single map config.
+   *
+   * @param configKey
+   * @param configValMap
+   */
+  public void putMapConfig(String configKey, Map<String, String> configValMap) {
+    getRecord().setMapField(configKey, configValMap);
+  }
+
+  /**
+   * Get a single map config.
+   *
+   * @param configKey
+   * @return configuration value map, or NULL if not exist.
+   */
+  public Map<String, String> getMapConfig(String configKey) {
+    return getRecord().getMapField(configKey);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ResourceConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index c7c2f38..37a2f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -36,49 +36,87 @@ import com.google.common.collect.Maps;
  * Provides a typed interface to job configurations.
  */
 public class JobConfig {
-  // // Property names ////
-
-  /** The name of the workflow to which the job belongs. */
-  public static final String WORKFLOW_ID = "WorkflowID";
-  /** The assignment strategy of this job */
-  public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
-  /** The name of the target resource. */
-  public static final String TARGET_RESOURCE = "TargetResource";
-  /**
-   * The set of the target partition states. The value must be a comma-separated list of partition
-   * states.
-   */
-  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+
   /**
-   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder.
    */
-  public static final String TARGET_PARTITIONS = "TargetPartitions";
-  /** The command that is to be run by participants in the case of identical tasks. */
-  public static final String COMMAND = "Command";
-  /** The command configuration to be used by the tasks. */
-  public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
-  /** The timeout for a task. */
-  public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
-  /** The maximum number of times the task rebalancer may attempt to execute a task. */
-  public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
-  /** The maximum number of times Helix will intentionally move a failing task */
-  public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
-  /** The number of concurrent tasks that are allowed to run on an instance. */
-  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-  /** The number of tasks within the job that are allowed to fail. */
-  public static final String FAILURE_THRESHOLD = "FailureThreshold";
-  /** The amount of time in ms to wait before retrying a task */
-  public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
-
-  /** The individual task configurations, if any **/
-  public static final String TASK_CONFIGS = "TaskConfigs";
-
-  /** Disable external view (not showing) for this job resource */
-  public static final String DISABLE_EXTERNALVIEW = "DisableExternalView";
-
-
-  // // Default property values ////
+  protected enum JobConfigProperty {
+    /**
+     * The name of the workflow to which the job belongs.
+     */
+    WORKFLOW_ID("WorkflowID"),
+    /**
+     * The assignment strategy of this job
+     */
+    ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+    /**
+     * The name of the target resource.
+     */
+    TARGET_RESOURCE("TargetResource"),
+    /**
+     * The set of the target partition states. The value must be a comma-separated list of partition
+     * states.
+     */
+    TARGET_PARTITION_STATES("TargetPartitionStates"),
+    /**
+     * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+     */
+    TARGET_PARTITIONS("TargetPartitions"),
+    /**
+     * The command that is to be run by participants in the case of identical tasks.
+     */
+    COMMAND("Command"),
+    /**
+     * The command configuration to be used by the tasks.
+     */
+    JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+    /**
+     * The timeout for a task.
+     */
+    TIMEOUT_PER_TASK("TimeoutPerPartition"),
+    /**
+     * The maximum number of times the task rebalancer may attempt to execute a task.
+     */
+    MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+    /**
+     * The maximum number of times Helix will intentionally move a failing task
+     */
+    MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+    /**
+     * The number of concurrent tasks that are allowed to run on an instance.
+     */
+    NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+    /**
+     * The number of tasks within the job that are allowed to fail.
+     */
+    FAILURE_THRESHOLD("FailureThreshold"),
+    /**
+     * The amount of time in ms to wait before retrying a task
+     */
+    TASK_RETRY_DELAY("TaskRetryDelay"),
+
+    /**
+     * The individual task configurations, if any *
+     */
+    TASK_CONFIGS("TaskConfigs"),
+
+    /**
+     * Disable external view (not showing) for this job resource
+     */
+    DISABLE_EXTERNALVIEW("DisableExternalView");
+
+    private final String _value;
+
+    private JobConfigProperty(String val) {
+      _value = val;
+    }
+
+    public String value() {
+      return _value;
+    }
+  }
 
+  //Default property values
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -106,8 +144,7 @@ public class JobConfig {
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
-      boolean disableExternalView,
-      Map<String, TaskConfig> taskConfigMap) {
+      boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -190,34 +227,39 @@ public class JobConfig {
 
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+    cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
     if (_command != null) {
-      cfgMap.put(JobConfig.COMMAND, _command);
+      cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
     }
     if (_jobCommandConfigMap != null) {
       String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
       if (serializedConfig != null) {
-        cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
+        cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
       }
     }
     if (_targetResource != null) {
-      cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+      cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
     }
     if (_targetPartitionStates != null) {
-      cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+      cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+          Joiner.on(",").join(_targetPartitionStates));
     }
     if (_targetPartitions != null) {
-      cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+      cfgMap
+          .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
     }
     if (_retryDelay > 0) {
-      cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+      cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
     }
-    cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
-    cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
-    cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
-    cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
-    cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
-    cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
+    cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
+    cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
+    cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+        "" + _maxForcedReassignmentsPerTask);
+    cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
+    cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+        Boolean.toString(_disableExternalView));
+    cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+        "" + _numConcurrentTasksPerInstance);
     return cfgMap;
   }
 
@@ -251,54 +293,58 @@ public class JobConfig {
 
     /**
      * Convenience method to build a {@link JobConfig} from a {@code Map&lt;String, String&gt;}.
+     *
      * @param cfg A map of property names to their string representations.
      * @return A {@link Builder}.
      */
     public static Builder fromMap(Map<String, String> cfg) {
       Builder b = new Builder();
-      if (cfg.containsKey(WORKFLOW_ID)) {
-        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
+        b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
       }
-      if (cfg.containsKey(TARGET_RESOURCE)) {
-        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
+        b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
       }
-      if (cfg.containsKey(TARGET_PARTITIONS)) {
-        b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
+        b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
       }
-      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
-        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
-            TARGET_PARTITION_STATES).split(","))));
+      if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+        b.setTargetPartitionStates(new HashSet<String>(
+            Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
       }
-      if (cfg.containsKey(COMMAND)) {
-        b.setCommand(cfg.get(COMMAND));
+      if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
+        b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
       }
-      if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
-        Map<String, String> commandConfigMap =
-            TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+      if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+        Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
+            cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
-      if (cfg.containsKey(TIMEOUT_PER_TASK)) {
-        b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
       }
-      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
-        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
-            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+        b.setNumConcurrentTasksPerInstance(
+            Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
       }
-      if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
-        b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+        b.setMaxAttemptsPerTask(
+            Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
       }
-      if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
-        b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
-            .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+      if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+        b.setMaxForcedReassignmentsPerTask(
+            Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
       }
-      if (cfg.containsKey(FAILURE_THRESHOLD)) {
-        b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+      if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+        b.setFailureThreshold(
+            Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
       }
-      if (cfg.containsKey(TASK_RETRY_DELAY)) {
-        b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+      if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
+        b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
       }
-      if (cfg.containsKey(DISABLE_EXTERNALVIEW)) {
-        b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW)));
+      if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+        b.setDisableExternalView(
+            Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
       }
       return b;
     }
@@ -384,38 +430,46 @@ public class JobConfig {
 
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
       }
-      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
-          && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
-            TARGET_PARTITION_STATES));
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+          .isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
       }
       if (_taskConfigMap.isEmpty() && _command == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.COMMAND));
       }
       if (_timeoutPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            TIMEOUT_PER_TASK, _timeoutPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+                _timeoutPerTask));
       }
       if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+                _numConcurrentTasksPerInstance));
       }
       if (_maxAttemptsPerTask < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+                _maxAttemptsPerTask));
       }
       if (_maxForcedReassignmentsPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+                _maxForcedReassignmentsPerTask));
       }
       if (_failureThreshold < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            FAILURE_THRESHOLD, _failureThreshold));
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+                _failureThreshold));
       }
       if (_workflow == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+        throw new IllegalArgumentException(
+            String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 9b64aec..c4986ee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -55,6 +55,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d804fab..524b889 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -402,10 +402,10 @@ public class TaskUtil {
     Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
     JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
     Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
 
     // Set the workflow expiry
-    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+    workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
 
     // Set the schedule, if applicable
     ScheduleConfig scheduleConfig;
@@ -415,7 +415,7 @@ public class TaskUtil {
       scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
     }
     if (scheduleConfig != null) {
-      builder.setScheduleConfig(scheduleConfig);
+      workflowBuilder.setScheduleConfig(scheduleConfig);
     }
 
     // Add each job back as long as the original exists
@@ -426,29 +426,30 @@ public class TaskUtil {
         String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
         HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
         Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
-        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
-          builder.addConfig(job, e.getKey(), e.getValue());
-        }
+
+        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
         Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
         List<TaskConfig> taskConfigs = Lists.newLinkedList();
         for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
           TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
           taskConfigs.add(taskConfig);
         }
-        builder.addTaskConfigs(job, taskConfigs);
+        jobCfgBuilder.addTaskConfigs(taskConfigs);
+        workflowBuilder.addJobConfig(job, jobCfgBuilder);
 
         // Add dag dependencies
         Set<String> children = parentsToChildren.get(namespacedJob);
         if (children != null) {
           for (String namespacedChild : children) {
             String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
-            builder.addParentChildDependency(job, child);
+            workflowBuilder.addParentChildDependency(job, child);
           }
         }
       }
     }
-    return builder.build();
+    return workflowBuilder.build();
   }
 
   private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 8ea2691..3a050c2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -128,7 +128,9 @@ public class Workflow {
     return parse(new StringReader(yaml));
   }
 
-  /** Helper function to parse workflow from a generic {@link Reader} */
+  /**
+   * Helper function to parse workflow from a generic {@link Reader}
+   */
   private static Workflow parse(Reader reader) throws Exception {
     Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
@@ -146,29 +148,32 @@ public class Workflow {
           }
         }
 
-        builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
-        builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
         if (job.jobConfigMap != null) {
           builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
         }
-        builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+            job.targetResource);
         if (job.targetPartitionStates != null) {
-          builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
               Joiner.on(",").join(job.targetPartitionStates));
         }
         if (job.targetPartitions != null) {
-          builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+          builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
               Joiner.on(",").join(job.targetPartitions));
         }
-        builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
             String.valueOf(job.maxAttemptsPerTask));
-        builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+        builder.addConfig(job.name,
+            JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
             String.valueOf(job.maxForcedReassignmentsPerTask));
-        builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+        builder.addConfig(job.name,
+            JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
             String.valueOf(job.numConcurrentTasksPerInstance));
-        builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
             String.valueOf(job.timeoutPerPartition));
-        builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD,
+        builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
             String.valueOf(job.failureThreshold));
         if (job.tasks != null) {
           List<TaskConfig> taskConfigs = Lists.newArrayList();
@@ -242,7 +247,7 @@ public class Workflow {
       _expiry = -1;
     }
 
-    public Builder addConfig(String job, String key, String val) {
+    private Builder addConfig(String job, String key, String val) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_jobConfigs.containsKey(job)) {
@@ -252,8 +257,8 @@ public class Workflow {
       return this;
     }
 
-    public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
-      return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+    private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
+      return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
           TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
     }
 
@@ -268,7 +273,7 @@ public class Workflow {
       return this;
     }
 
-    public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+    private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_taskConfigs.containsKey(job)) {
@@ -322,7 +327,7 @@ public class Workflow {
     protected WorkflowConfig.Builder buildWorkflowConfig() {
       for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
-        _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
+        _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
       }
 
       WorkflowConfig.Builder builder;

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index a00a736..40c2485 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -140,8 +140,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _runCounts.clear();
   }
 
-  @Test
-  public void testDifferentTasks() throws Exception {
+  @Test public void testDifferentTasks() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -150,11 +149,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -166,8 +166,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testThresholdFailure() throws Exception {
+  @Test public void testThresholdFailure() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -177,12 +176,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
+            .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -194,8 +193,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testOptionalTaskFailure() throws Exception {
+  @Test public void testOptionalTaskFailure() throws Exception {
     // Create a job with two different tasks
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -205,11 +203,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
     taskConfigs.add(taskConfig1);
     taskConfigs.add(taskConfig2);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -221,24 +222,23 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
   }
 
-  @Test
-  public void testReassignment() throws Exception {
+  @Test public void testReassignment() throws Exception {
     final int NUM_INSTANCES = 2;
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    Map<String, String> taskConfigMap =
-        Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
-            + START_PORT));
+    Map<String, String> taskConfigMap = Maps.newHashMap(
+        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
-        + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -251,8 +251,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     // Ensure that this was tried on two different instances, the first of which exhausted the
     // attempts number, and the other passes on the first try
     Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
-    Assert.assertTrue(_runCounts.values().contains(
-        JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+    Assert.assertTrue(
+        _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
     Assert.assertTrue(_runCounts.values().contains(1));
   }
 
@@ -264,11 +264,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, String> taskConfigMap = Maps.newHashMap();
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
     workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
     _driver.start(workflowBuilder.build());
@@ -295,11 +298,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, String> taskConfigMap = Maps.newHashMap();
     TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
-    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
-    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
-    workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
-    Map<String, String> jobConfigMap = Maps.newHashMap();
-    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJobConfig(jobName, jobBuilder);
+
     SingleFailTask.hasFailed = false;
     _driver.start(workflowBuilder.build());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 79adcd5..da13ada 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -215,12 +215,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
+      JobConfig.Builder jobConfig =
           new JobConfig.Builder().setCommand("Reindex")
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuild.enqueueJob(jobName, job);
+      queueBuild.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f402b82..3352d1c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -163,9 +163,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     String jobName = "Expiry";
     long expiry = 1000;
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
-    Workflow flow =
-        WorkflowGenerator
-            .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
+
+    Workflow flow = WorkflowGenerator
+            .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
             .setExpiry(expiry).build();
 
     _driver.start(flow);
@@ -204,9 +206,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     final String jobResource = "basic" + jobCompletionTime;
     Map<String, String> commandConfig =
         ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            commandConfig).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // Wait for job completion
@@ -220,18 +225,20 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     }
   }
 
-  @Test
-  public void partitionSet() throws Exception {
+  @Test public void partitionSet() throws Exception {
     final String jobResource = "partitionSet";
     ImmutableList<String> targetPartitions =
         ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1)
+        .setTargetPartitions(targetPartitions);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
-            JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // wait for job completeness/timeout
@@ -268,13 +275,15 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     }
   }
 
-  @Test
-  public void timeouts() throws Exception {
+  @Test public void timeouts() throws Exception {
     final String jobResource = "timeouts";
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .setMaxAttemptsPerTask(2).setTimeoutPerTask(100);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
-            String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
     // Wait until the job reports failure.

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index b678d7e..efe90b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -125,18 +125,15 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  @Test
-  public void test() throws Exception {
+  @Test public void test() throws Exception {
     String jobResource = TestHelper.getTestMethodName();
+
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .setMaxAttemptsPerTask(2).setCommand("ErrorTask").setFailureThreshold(Integer.MAX_VALUE);
+
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
-            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
-            String.valueOf(2)).build();
-    Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs();
-    for (Map<String, String> jobConfig : jobConfigs.values()) {
-      jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
-      jobConfig.put(JobConfig.COMMAND, "ErrorTask");
-    }
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
 
     _driver.start(flow);
 
@@ -151,7 +148,6 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
         Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
       }
     }
-
   }
 
   private static class ErrorTask implements Task {

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 8a44672..7437b72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -162,12 +162,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  @Test
-  public void stopAndResume() throws Exception {
+  @Test public void stopAndResume() throws Exception {
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+    JobConfig.Builder jobBuilder =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(commandConfig);
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
-            commandConfig).build();
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(JOB_RESOURCE, jobBuilder).build();
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);

http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index a414f5c..23c35af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -56,58 +56,34 @@ public class WorkflowGenerator {
     DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
-      String jobName, Map<String, String> commandConfig, String... cfgs) {
-    if (cfgs.length % 2 != 0) {
-      throw new IllegalArgumentException(
-          "Additional configs should have even number of keys and values");
-    }
-    Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG);
-    for (int i = 0; i < cfgs.length; i += 2) {
-      bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
-    }
-
-    return bldr;
+  private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
+  static {
+    JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+    DEFAULT_JOB_BUILDER = builder;
   }
 
   public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
-    return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+    return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
   }
 
   public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
-      Map<String, String> commandConfig, Map<String, String> config) {
-    Workflow.Builder builder = new Workflow.Builder(jobName);
-    for (String key : config.keySet()) {
-      builder.addConfig(jobName, key, config.get(key));
-    }
-    if (commandConfig != null) {
-      ObjectMapper mapper = new ObjectMapper();
-      try {
-        String serializedMap = mapper.writeValueAsString(commandConfig);
-        builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-      } catch (IOException e) {
-        LOG.error("Error serializing " + commandConfig, e);
-      }
-    }
-    return builder;
+      JobConfig.Builder jobBuilder) {
+    return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);
   }
 
   public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
     Workflow.Builder builder = new Workflow.Builder(workflowName);
     builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
 
-    for (String key : DEFAULT_JOB_CONFIG.keySet()) {
-      builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
-      builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
-    }
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
-      builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-      builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
-    } catch (IOException e) {
-      LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
-    }
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+    builder.addJobConfig(JOB_NAME_1, jobBuilder);
+    builder.addJobConfig(JOB_NAME_2, jobBuilder);
+
     return builder;
   }
 }