You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/17 22:39:41 UTC

helix git commit: [HELIX-738] Remove quotaType APIs and make jobs inherit type from workflows

Repository: helix
Updated Branches:
  refs/heads/master 8cd7aee6f -> 36ab2a602


[HELIX-738] Remove quotaType APIs and make jobs inherit type from workflows

For quota-based task scheduling, for each job, we provided get/setQuotaType APIs. However, the use case for workflow types and job types were similar enough that we decided to merge them and begin using workflow/job types for quota-based scheduling. Job types will now be used as quota types, and all jobs will inherit the type, if set, from their parent workflow, at assignment and schedule time.

Changelist:
1. Remove APIs around quotaType in Workflow/JobConfig
2. Add an internal method in TaskAssignmentCalculator that includes logic for determining which quota type each job should use
3. Adjust tests so that they test and pass successfully


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/36ab2a60
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/36ab2a60
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/36ab2a60

Branch: refs/heads/master
Commit: 36ab2a6028dad39b32d3a15da942b4385ff9fd1d
Parents: 8cd7aee
Author: Hunter Lee <na...@gmail.com>
Authored: Tue Jul 17 13:36:18 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Tue Jul 17 15:37:40 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      |  8 ++--
 .../helix/task/AssignableInstanceManager.java   |  2 +-
 .../FixedTargetTaskAssignmentCalculator.java    | 13 +++---
 .../java/org/apache/helix/task/JobConfig.java   | 46 ++++++--------------
 .../helix/task/TaskAssignmentCalculator.java    | 23 ++++++++++
 ...hreadCountBasedTaskAssignmentCalculator.java |  2 +-
 .../org/apache/helix/task/WorkflowConfig.java   | 44 +++----------------
 .../task/TestJobAndWorkflowType.java            | 13 +++---
 .../task/TestQuotaBasedScheduling.java          | 38 ++++++++--------
 .../task/TestWorkflowTermination.java           |  2 +-
 10 files changed, 79 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 36e5698..eb67d59 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -94,7 +94,7 @@ public abstract class AbstractTaskDispatcher {
         }
 
         // Get AssignableInstance for this instance and TaskConfig for releasing resources
-        String quotaType = jobCfg.getQuotaType();
+        String quotaType = jobCfg.getJobType();
         AssignableInstance assignableInstance = assignableInstanceMap.get(instance);
         String taskId;
         if (TaskUtil.isGenericTaskJob(jobCfg)) {
@@ -483,7 +483,7 @@ public abstract class AbstractTaskDispatcher {
         }
         AssignableInstance assignableInstance =
             cache.getAssignableInstanceManager().getAssignableInstanceMap().get(instance);
-        String quotaType = jobCfg.getQuotaType();
+        String quotaType = jobCfg.getJobType();
         for (int partitionNum : tgtPartitionAssignments.get(instance)) {
           // Get the TaskConfig for this partitionNumber
           String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
@@ -547,7 +547,7 @@ public abstract class AbstractTaskDispatcher {
         }
         AssignableInstance assignableInstance =
             cache.getAssignableInstanceManager().getAssignableInstanceMap().get(instance);
-        String quotaType = jobCfg.getQuotaType();
+        String quotaType = jobCfg.getJobType();
         for (int partitionNum : throttledSet) {
           // Get the TaskConfig for this partitionNumber
           String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
@@ -798,7 +798,7 @@ public abstract class AbstractTaskDispatcher {
                 Iterable<AssignableInstance> assignableInstances = clusterDataCache
                     .getAssignableInstanceManager().getAssignableInstanceMap().values();
                 JobConfig jobConfig = jobConfigMap.get(jobToFail);
-                String quotaType = jobConfig.getQuotaType();
+                String quotaType = jobConfig.getJobType();
                 Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
                 // Iterate over all tasks and release them
                 for (Map.Entry<String, TaskConfig> taskEntry : taskConfigMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 701e444..e25e23a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -99,7 +99,7 @@ public class AssignableInstanceManager {
             jobName, jobConfig, jobContext);
         continue; // Ignore this job if either the config or context is null
       }
-      String quotaType = jobConfig.getQuotaType();
+      String quotaType = jobConfig.getJobType();
       Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in
       // this job (this is NOT taskId)
       for (int taskIndex : taskIndices) {

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index e7dd959..87b57c7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -83,8 +83,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
-    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, instances, jobCfg,
-        jobContext, partitionSet, idealStateMap);
+    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, instances,
+        workflowCfg, jobCfg, jobContext, partitionSet, idealStateMap);
   }
 
   /**
@@ -187,17 +187,14 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    */
   private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, JobConfig jobCfg, JobContext jobContext,
-      Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
+      Collection<String> liveInstances, WorkflowConfig workflowCfg, JobConfig jobCfg,
+      JobContext jobContext, Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
 
     // Note: targeted jobs also take up capacity in quota-based scheduling
     // "Charge" resources for the tasks
     Map<String, AssignableInstance> assignableInstanceMap =
         _assignableInstanceManager.getAssignableInstanceMap();
-    String quotaType = jobCfg.getQuotaType();
-    if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
-      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
-    }
+    String quotaType = getQuotaType(workflowCfg, jobCfg);
 
     // IdealState of the target resource
     IdealState targetIdealState = idealStateMap.get(jobCfg.getTargetResource());

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index d394931..b4fa131 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -154,11 +154,6 @@ public class JobConfig extends ResourceConfig {
      * Whether or not enable running task rebalance
      */
     RebalanceRunningTask,
-
-    /**
-     * Quota type for this job used for quota-based scheduling
-     */
-    QuotaType
   }
 
   // Default property values
@@ -192,7 +187,7 @@ public class JobConfig extends ResourceConfig {
         jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
         jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(),
         jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(),
-        jobConfig.isRebalanceRunningTask(), jobConfig.getQuotaType());
+        jobConfig.isRebalanceRunningTask());
   }
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
@@ -202,7 +197,7 @@ public class JobConfig extends ResourceConfig {
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
       long executionDelay, long executionStart, String jobId, long expiry,
-      boolean rebalanceRunningTask, String quotaType) {
+      boolean rebalanceRunningTask) {
     super(jobId);
     putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
     putSimpleConfig(JobConfigProperty.JobID.name(), jobId);
@@ -266,9 +261,6 @@ public class JobConfig extends ResourceConfig {
         String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
     getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
         rebalanceRunningTask);
-    if (quotaType != null) {
-      putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType);
-    }
   }
 
   public String getWorkflow() {
@@ -293,7 +285,7 @@ public class JobConfig extends ResourceConfig {
 
   public Set<String> getTargetPartitionStates() {
     if (simpleConfigContains(JobConfigProperty.TargetPartitionStates.name())) {
-      return new HashSet<String>(Arrays
+      return new HashSet<>(Arrays
           .asList(getSimpleConfig(JobConfigProperty.TargetPartitionStates.name()).split(",")));
     }
     return null;
@@ -408,6 +400,11 @@ public class JobConfig extends ResourceConfig {
     return getSimpleConfigs();
   }
 
+  /**
+   * Returns the job type for this job. This type will be used as a quota type for quota-based
+   * scheduling.
+   * @return quota type. null if quota type is not set
+   */
   public String getJobType() {
     return getSimpleConfig(JobConfigProperty.JobType.name());
   }
@@ -425,14 +422,6 @@ public class JobConfig extends ResourceConfig {
         DEFAULT_REBALANCE_RUNNING_TASK);
   }
 
-  /**
-   * Returns the quota type for this job.
-   * @return quota type. null if quota type is not set
-   */
-  public String getQuotaType() {
-    return getSimpleConfig(JobConfigProperty.QuotaType.name());
-  }
-
   public static JobConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -467,7 +456,6 @@ public class JobConfig extends ResourceConfig {
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
     private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK;
-    private String _quotaType;
 
     public JobConfig build() {
       if (_targetResource == null && _taskConfigMap.isEmpty()) {
@@ -487,7 +475,7 @@ public class JobConfig extends ResourceConfig {
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
           _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
-          _rebalanceRunningTask, _quotaType);
+          _rebalanceRunningTask);
     }
 
     /**
@@ -510,7 +498,7 @@ public class JobConfig extends ResourceConfig {
         b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TargetPartitions.name())));
       }
       if (cfg.containsKey(JobConfigProperty.TargetPartitionStates.name())) {
-        b.setTargetPartitionStates(new HashSet<String>(
+        b.setTargetPartitionStates(new HashSet<>(
             Arrays.asList(cfg.get(JobConfigProperty.TargetPartitionStates.name()).split(","))));
       }
       if (cfg.containsKey(JobConfigProperty.Command.name())) {
@@ -568,9 +556,6 @@ public class JobConfig extends ResourceConfig {
         b.setRebalanceRunningTask(
             Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
       }
-      if (cfg.containsKey(JobConfigProperty.QuotaType.name())) {
-        b.setQuotaType(cfg.get(JobConfigProperty.QuotaType.name()));
-      }
       return b;
     }
 
@@ -705,11 +690,6 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
-    public Builder setQuotaType(String quotaType) {
-      _quotaType = quotaType;
-      return this;
-    }
-
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -736,7 +716,7 @@ public class JobConfig extends ResourceConfig {
         for (TaskConfig taskConfig : _taskConfigMap.values()) {
           if (taskConfig.getCommand() == null) {
             throw new IllegalArgumentException(
-                String.format("Task % command cannot be null", taskConfig.getId()));
+                String.format("Task %s command cannot be null", taskConfig.getId()));
           }
         }
       }
@@ -781,7 +761,7 @@ public class JobConfig extends ResourceConfig {
           .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
           .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
           .setExecutionStart(jobBean.executionStart)
-          .setRebalanceRunningTask(jobBean.rebalanceRunningTask).setQuotaType(jobBean.quotaType);
+          .setRebalanceRunningTask(jobBean.rebalanceRunningTask);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
@@ -793,7 +773,7 @@ public class JobConfig extends ResourceConfig {
         b.setTargetResource(jobBean.targetResource);
       }
       if (jobBean.targetPartitionStates != null) {
-        b.setTargetPartitionStates(new HashSet<String>(jobBean.targetPartitionStates));
+        b.setTargetPartitionStates(new HashSet<>(jobBean.targetPartitionStates));
       }
       if (jobBean.targetPartitions != null) {
         b.setTargetPartitions(jobBean.targetPartitions);

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 8286257..dc90502 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -8,6 +8,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
+import org.apache.helix.task.assigner.AssignableInstance;
+
 
 public abstract class TaskAssignmentCalculator {
   /**
@@ -43,4 +45,25 @@ public abstract class TaskAssignmentCalculator {
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
       Map<String, IdealState> idealStateMap);
+
+  /**
+   * Returns the correct type for this job. Note that if the parent workflow has a type, then all of
+   * its jobs will inherit the type from the workflow.
+   * @param workflowConfig
+   * @param jobConfig
+   * @return
+   */
+  String getQuotaType(WorkflowConfig workflowConfig, JobConfig jobConfig) {
+    String workflowType = workflowConfig.getWorkflowType();
+    if (workflowType == null || workflowType.equals("")) {
+      // Workflow type is null, so we go by the job type
+      String jobType = jobConfig.getJobType();
+      if (jobType == null || jobType.equals("")) {
+        // Job type is null, so we use DEFAULT
+        return AssignableInstance.DEFAULT_QUOTA_TYPE;
+      }
+      return jobType;
+    }
+    return workflowType;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index 56221eb..940747e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -94,7 +94,7 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc
     Iterable<TaskConfig> taskConfigs = getFilteredTaskConfigs(partitionSet, jobCfg, jobContext);
 
     // Get the quota type to assign tasks to
-    String quotaType = jobCfg.getQuotaType();
+    String quotaType = getQuotaType(workflowCfg, jobCfg);
 
     // Assign tasks to AssignableInstances
     Map<String, TaskAssignResult> taskAssignResultMap =

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 661615d..20da614 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -65,10 +65,7 @@ public class WorkflowConfig extends ResourceConfig {
     JobPurgeInterval,
     /* Allow multiple jobs in this workflow to be assigned to a same instance or not */
     AllowOverlapJobAssignment,
-    Timeout,
-    /* Quota related fields */
-    QuotaType // Quota type for workflows is a syntactic sugar for setting
-    // all of the jobs to this quota type
+    Timeout
   }
 
   /* Default values */
@@ -93,7 +90,7 @@ public class WorkflowConfig extends ResourceConfig {
     this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
         cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
         cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.getJobPurgeInterval(),
-        cfg.isAllowOverlapJobAssignment(), cfg.getTimeout(), cfg.getQuotaType());
+        cfg.isAllowOverlapJobAssignment(), cfg.getTimeout());
   }
 
   /* Member variables */
@@ -103,7 +100,7 @@ public class WorkflowConfig extends ResourceConfig {
       TargetState targetState, long expiry, int failureThreshold, boolean terminable,
       ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
       Map<String, String> jobTypes, long purgeInterval, boolean allowOverlapJobAssignment,
-      long timeout, String quotaType) {
+      long timeout) {
     super(workflowId);
 
     putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -154,11 +151,6 @@ public class WorkflowConfig extends ResourceConfig {
     }
     putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
         String.valueOf(DEFAULT_MONITOR_DISABLE));
-
-    // Set the quota type for this workflow
-    if (quotaType != null) {
-      putSimpleConfig(WorkflowConfigProperty.QuotaType.name(), quotaType);
-    }
   }
 
   public String getWorkflowId() {
@@ -318,14 +310,6 @@ public class WorkflowConfig extends ResourceConfig {
     return null;
   }
 
-  /**
-   * Returns the quota type set for this workflow.
-   * @return quotaType string, null if quota type is not set
-   */
-  public String getQuotaType() {
-    return getSimpleConfig(WorkflowConfigProperty.QuotaType.name());
-  }
-
   public static WorkflowConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -352,14 +336,13 @@ public class WorkflowConfig extends ResourceConfig {
     private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
     private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
     private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
-    private String _quotaType = null;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
           _failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
-          _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout, _quotaType);
+          _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout);
     }
 
     public Builder() {
@@ -385,7 +368,6 @@ public class WorkflowConfig extends ResourceConfig {
       _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
       _allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment();
       _timeout = workflowConfig.getTimeout();
-      _quotaType = workflowConfig.getQuotaType();
     }
 
     public Builder setWorkflowId(String v) {
@@ -508,17 +490,6 @@ public class WorkflowConfig extends ResourceConfig {
       return this;
     }
 
-    /**
-     * Set the quota type for this workflow. If a workflow has a quota type set,
-     * all of its jobs will be of that quota type.
-     * @param quotaType
-     * @return
-     */
-    public Builder setQuotaType(String quotaType) {
-      _quotaType = quotaType;
-      return this;
-    }
-
     @Deprecated
     public static Builder fromMap(Map<String, String> cfg) {
       Builder builder = new Builder();
@@ -603,10 +574,6 @@ public class WorkflowConfig extends ResourceConfig {
         setTimeout(Long.parseLong(cfg.get(WorkflowConfigProperty.Timeout.name())));
       }
 
-      if (cfg.containsKey(WorkflowConfigProperty.QuotaType.name())) {
-        setQuotaType(cfg.get(WorkflowConfigProperty.QuotaType.name()));
-      }
-
       return this;
     }
 
@@ -656,7 +623,6 @@ public class WorkflowConfig extends ResourceConfig {
         b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule));
       }
       b.setExpiry(workflowBean.expiry);
-      b.setQuotaType(workflowBean.quotaType);
 
       return b;
     }
@@ -674,4 +640,4 @@ public class WorkflowConfig extends ResourceConfig {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
index 86a588f..e01eaef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
@@ -32,16 +32,17 @@ import org.testng.annotations.Test;
 
 public class TestJobAndWorkflowType extends TaskTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestJobAndWorkflowType.class);
+  private static final String DEFAULT_TYPE = "DEFAULT";
 
   @Test
   public void testJobAndWorkflowType() throws InterruptedException {
     LOG.info("Start testing job and workflow type");
     String jobName = TestHelper.getTestMethodName();
     JobConfig.Builder jobConfig = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setJobType("JobTestType");
+        .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setJobType(DEFAULT_TYPE);
 
-    Map<String, String> tmp = new HashMap<String, String>();
-    tmp.put("WorkflowType", "WorkflowTestType");
+    Map<String, String> tmp = new HashMap<>();
+    tmp.put("WorkflowType", DEFAULT_TYPE);
     Workflow.Builder builder =
         WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName, jobConfig).fromMap(tmp);
 
@@ -54,7 +55,7 @@ public class TestJobAndWorkflowType extends TaskTestBase {
     String fetchedWorkflowType =
         _driver.getWorkflowConfig(jobName).getWorkflowType();
 
-    Assert.assertEquals(fetchedJobType, "JobTestType");
-    Assert.assertEquals(fetchedWorkflowType, "WorkflowTestType");
+    Assert.assertEquals(fetchedJobType, DEFAULT_TYPE);
+    Assert.assertEquals(fetchedWorkflowType, DEFAULT_TYPE);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index 8dd24db..e2da7d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -187,7 +187,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
       List<TaskConfig> taskConfigs = new ArrayList<>();
       taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
       JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
-          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setQuotaType("A");
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
       workflowBuilder.addJob("JOB" + i, jobConfigBulider);
     }
 
@@ -195,7 +195,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
       List<TaskConfig> taskConfigs = new ArrayList<>();
       taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
       JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
-          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setQuotaType("B");
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("B");
       workflowBuilder.addJob("JOB" + i, jobConfigBulider);
     }
 
@@ -247,7 +247,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderA =
         new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-            .addTaskConfigs(taskConfigsA).setQuotaType("A").setNumConcurrentTasksPerInstance(20);
+            .addTaskConfigs(taskConfigsA).setJobType("A").setNumConcurrentTasksPerInstance(20);
     workflowBuilder.addJob("JOB_A", jobBuilderA);
 
     // JOB_B
@@ -258,7 +258,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderB =
         new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-            .addTaskConfigs(taskConfigsB).setQuotaType("B").setNumConcurrentTasksPerInstance(20);
+            .addTaskConfigs(taskConfigsB).setJobType("B").setNumConcurrentTasksPerInstance(20);
     workflowBuilder.addJob("JOB_B", jobBuilderB);
 
     // JOB_C
@@ -269,7 +269,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderC =
         new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-            .addTaskConfigs(taskConfigsC).setQuotaType("C").setNumConcurrentTasksPerInstance(20);
+            .addTaskConfigs(taskConfigsC).setJobType("C").setNumConcurrentTasksPerInstance(20);
     workflowBuilder.addJob("JOB_C", jobBuilderC);
 
     _driver.start(workflowBuilder.build());
@@ -315,7 +315,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderA =
         new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-            .addTaskConfigs(taskConfigsA).setQuotaType("A").setNumConcurrentTasksPerInstance(50);
+            .addTaskConfigs(taskConfigsA).setJobType("A").setNumConcurrentTasksPerInstance(50);
     workflowBuilder.addJob("JOB_A", jobBuilderA);
 
     // JOB_B
@@ -326,7 +326,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderB =
         new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-            .addTaskConfigs(taskConfigsB).setQuotaType("B").setNumConcurrentTasksPerInstance(50);
+            .addTaskConfigs(taskConfigsB).setJobType("B").setNumConcurrentTasksPerInstance(50);
     workflowBuilder.addJob("JOB_B", jobBuilderB);
 
     // JOB_C (DEFAULT type)
@@ -337,7 +337,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
     JobConfig.Builder jobBuilderC = new JobConfig.Builder().setCommand(JOB_COMMAND)
         .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsC)
-        .setQuotaType(DEFAULT_QUOTA_TYPE).setNumConcurrentTasksPerInstance(50);
+        .setJobType(DEFAULT_QUOTA_TYPE).setNumConcurrentTasksPerInstance(50);
     workflowBuilder.addJob("JOB_DEFAULT", jobBuilderC);
 
     _driver.start(workflowBuilder.build());
@@ -391,7 +391,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
       taskConfigsA.add(new TaskConfig("ShortTask", taskConfigMap));
     }
     JobConfig.Builder jobBuilderA = new JobConfig.Builder().setCommand(JOB_COMMAND)
-        .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsA).setQuotaType("A");
+        .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsA).setJobType("A");
     workflowBuilder.addJob("JOB_A", jobBuilderA);
 
     // JOB_B
@@ -401,7 +401,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
       taskConfigsB.add(new TaskConfig("ShortTask", taskConfigMap));
     }
     JobConfig.Builder jobBuilderB = new JobConfig.Builder().setCommand(JOB_COMMAND)
-        .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsB).setQuotaType("B");
+        .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsB).setJobType("B");
     workflowBuilder.addJob("JOB_B", jobBuilderB);
 
     _driver.start(workflowBuilder.build());
@@ -504,7 +504,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     List<TaskConfig> taskConfigs = new ArrayList<>();
     taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
     JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
-        .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setQuotaType("A");
+        .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
 
     for (int i = 0; i < 5; i++) {
       String jobName = "JOB_" + i;
@@ -521,7 +521,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     taskConfigs = new ArrayList<>();
     taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
     jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
-        .setJobCommandConfigMap(_jobCommandMap).setQuotaType("B");
+        .setJobCommandConfigMap(_jobCommandMap).setJobType("B");
 
     for (int i = 5; i < 10; i++) {
       String jobName = "JOB_" + i;
@@ -568,7 +568,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
       }
       JobConfig.Builder jobBuilder =
           new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
-              .addTaskConfigs(taskConfigs).setQuotaType(quotaType);
+              .addTaskConfigs(taskConfigs).setJobType(quotaType);
       workflowBuilder.addJob(jobName, jobBuilder);
     }
     return workflowBuilder.build();
@@ -581,10 +581,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     private final String _instanceName;
     private final String _quotaType;
 
-    public ShortTask(TaskCallbackContext context, String instanceName) {
+    ShortTask(TaskCallbackContext context, String instanceName) {
       super(context);
       _instanceName = instanceName;
-      _quotaType = context.getJobConfig().getQuotaType();
+      _quotaType = context.getJobConfig().getJobType();
       // Initialize the count for this quotaType if not already done
       if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
         _quotaTypeExecutionCount.put(_quotaType, 0);
@@ -608,10 +608,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     private final String _instanceName;
     private final String _quotaType;
 
-    public LongTask(TaskCallbackContext context, String instanceName) {
+    LongTask(TaskCallbackContext context, String instanceName) {
       super(context);
       _instanceName = instanceName;
-      _quotaType = context.getJobConfig().getQuotaType();
+      _quotaType = context.getJobConfig().getJobType();
       // Initialize the count for this quotaType if not already done
       if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
         _quotaTypeExecutionCount.put(_quotaType, 0);
@@ -643,10 +643,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     private final String _instanceName;
     private final String _quotaType;
 
-    public FailTask(TaskCallbackContext context, String instanceName) {
+    FailTask(TaskCallbackContext context, String instanceName) {
       super(context);
       _instanceName = instanceName;
-      _quotaType = context.getJobConfig().getQuotaType();
+      _quotaType = context.getJobConfig().getJobType();
       // Initialize the count for this quotaType if not already done
       if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
         _quotaTypeExecutionCount.put(_quotaType, 0);

http://git-wip-us.apache.org/repos/asf/helix/blob/36ab2a60/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 231be06..f303c52 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -25,7 +25,7 @@ import org.testng.annotations.Test;
  */
 public class TestWorkflowTermination extends TaskTestBase {
   private final static String JOB_NAME = "TestJob";
-  private final static String WORKFLOW_TYPE = "TestWorkflow";
+  private final static String WORKFLOW_TYPE = "DEFAULT";
   private static final MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
 
   @BeforeClass