You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:19:51 UTC

[helix] 17/44: Task Framework code style change

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a8e2cf7b76a802a5b005d32082699d4c9e39875a
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Mar 29 12:08:07 2019 -0700

    Task Framework code style change
    
    This diff includes style changes using Java 8 features.
    
    RB=1613441
    BUG=HELIX-1742
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 110 ++++-----
 .../java/org/apache/helix/task/JobDispatcher.java  |  21 +-
 .../java/org/apache/helix/task/TaskDriver.java     | 263 ++++++++++-----------
 .../org/apache/helix/task/WorkflowDispatcher.java  |  25 +-
 .../integration/task/TestQuotaBasedScheduling.java |  45 ++--
 .../helix/integration/task/TestTaskRebalancer.java |  27 +--
 6 files changed, 224 insertions(+), 267 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 78a7419..698730e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -79,7 +79,7 @@ public abstract class AbstractTaskDispatcher {
 
     // Iterate through all instances
     for (String instance : prevInstanceToTaskAssignments.keySet()) {
-      assignedPartitions.put(instance, new HashSet<Integer>());
+      assignedPartitions.put(instance, new HashSet<>());
 
       // Set all dropping transitions first. These are tasks coming from Participant disconnects
       // that have some active current state (INIT or RUNNING) and the requestedState of DROPPED.
@@ -709,7 +709,7 @@ public abstract class AbstractTaskDispatcher {
 
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, Set<Integer> throttled, int n) {
-    List<Integer> result = new ArrayList<Integer>();
+    List<Integer> result = new ArrayList<>();
     for (Integer pId : candidatePartitions) {
       if (!excluded.contains(pId)) {
         if (result.size() < n) {
@@ -932,8 +932,8 @@ public abstract class AbstractTaskDispatcher {
                   TaskConfig taskConfig = taskEntry.getValue();
                   for (String assignableInstanceName : assignableInstanceManager
                       .getAssignableInstanceNames()) {
-                    assignableInstanceManager
-                        .release(assignableInstanceName, taskConfig, quotaType);
+                    assignableInstanceManager.release(assignableInstanceName, taskConfig,
+                        quotaType);
                   }
                 }
               }
@@ -1120,9 +1120,9 @@ public abstract class AbstractTaskDispatcher {
     }
 
     if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) {
-      LOG.info(String
-          .format("Job %s not ready to schedule due to not having enough quota for quota type %s",
-              job, quotaType));
+      LOG.info(String.format(
+          "Job %s not ready to schedule due to not having enough quota for quota type %s", job,
+          quotaType));
       return false;
     }
 
@@ -1210,9 +1210,8 @@ public abstract class AbstractTaskDispatcher {
       LOG.debug(
           String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName));
     } else {
-      LOG.warn(String
-          .format("Failed to find runtime job DAG for workflow %s and job %s", workflowName,
-              jobName));
+      LOG.warn(String.format("Failed to find runtime job DAG for workflow %s and job %s",
+          workflowName, jobName));
     }
   }
 
@@ -1228,21 +1227,18 @@ public abstract class AbstractTaskDispatcher {
   protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider,
       final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
       final JobConfig jobConfig, final long currentTimestamp) {
-    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
-      @Override
-      public Object call() {
-        // Asynchronously update the appropriate JobMonitor
-        JobMonitor jobMonitor = clusterStatusMonitor
-            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
-        if (jobMonitor == null) {
-          return null;
-        }
-
-        // Compute SubmissionToProcessDelay
-        long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
-        jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+      // Asynchronously update the appropriate JobMonitor
+      JobMonitor jobMonitor = clusterStatusMonitor
+          .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+      if (jobMonitor == null) {
         return null;
       }
+
+      // Compute SubmissionToProcessDelay
+      long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+      jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+      return null;
     });
   }
 
@@ -1258,21 +1254,18 @@ public abstract class AbstractTaskDispatcher {
   private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider,
       final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
       final JobConfig jobConfig, final long currentTimestamp) {
-    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
-      @Override
-      public Object call() {
-        // Asynchronously update the appropriate JobMonitor
-        JobMonitor jobMonitor = clusterStatusMonitor
-            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
-        if (jobMonitor == null) {
-          return null;
-        }
-
-        // Compute SubmissionToScheduleDelay
-        long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
-        jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+      // Asynchronously update the appropriate JobMonitor
+      JobMonitor jobMonitor = clusterStatusMonitor
+          .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+      if (jobMonitor == null) {
         return null;
       }
+
+      // Compute SubmissionToScheduleDelay
+      long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+      jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+      return null;
     });
   }
 
@@ -1288,32 +1281,29 @@ public abstract class AbstractTaskDispatcher {
   private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider,
       final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
       final JobConfig jobConfig, final long currentTimestamp) {
-    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
-      @Override
-      public Object call() {
-        // Asynchronously update the appropriate JobMonitor
-        JobMonitor jobMonitor = clusterStatusMonitor
-            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
-        if (jobMonitor == null) {
-          return null;
-        }
-
-        // Compute ControllerInducedDelay only if the workload is a test load
-        // NOTE: this metric cannot be computed for general user-submitted workloads because
-        // the actual runtime of the tasks vary, and there could exist multiple tasks per
-        // job
-        // NOTE: a test workload will have the "latency" field in the mapField of the
-        // JobConfig (taskConfig)
-        String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
-        if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) {
-          long taskDuration =
-              Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
-          long controllerInducedDelay =
-              currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
-          jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
-        }
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+      // Asynchronously update the appropriate JobMonitor
+      JobMonitor jobMonitor = clusterStatusMonitor
+          .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+      if (jobMonitor == null) {
         return null;
       }
+
+      // Compute ControllerInducedDelay only if the workload is a test load
+      // NOTE: this metric cannot be computed for general user-submitted workloads because
+      // the actual runtime of the tasks vary, and there could exist multiple tasks per
+      // job
+      // NOTE: a test workload will have the "latency" field in the mapField of the
+      // JobConfig (taskConfig)
+      String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
+      if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) {
+        long taskDuration =
+            Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
+        long controllerInducedDelay =
+            currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
+        jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
+      }
+      return null;
     });
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 56e04ac..c8cf09a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -29,10 +29,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
@@ -157,8 +155,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     jobState = workflowCtx.getJobState(jobName);
     workflowState = workflowCtx.getWorkflowState();
 
-    if (INTERMEDIATE_STATES.contains(jobState) && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
-        || TaskState.TIMED_OUT.equals(workflowState))) {
+    if (INTERMEDIATE_STATES.contains(jobState)
+        && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
+            || TaskState.TIMED_OUT.equals(workflowState))) {
       jobState = TaskState.TIMING_OUT;
       workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
     } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
@@ -272,7 +271,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
         }
         Partition partition = new Partition(pName(jobResource, pId));
-        Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
+        Message pendingMessage =
+            currStateOutput.getPendingMessage(jobResource, partition, instance);
         // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
         // so that Helix will cancel the transition.
         if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
@@ -303,7 +303,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // can be dropped(note that Helix doesn't track whether the drop is success or not).
     if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
       handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
-      finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(), jobResource);
+      finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
+          jobResource);
       return buildEmptyAssignment(jobResource, currStateOutput);
     }
 
@@ -340,8 +341,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       TaskPartitionState state = jobContext.getPartitionState(pId);
       Partition partition = new Partition(pName(jobResource, pId));
       String instance = jobContext.getAssignedParticipant(pId);
-      Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
-          instance);
+      Message pendingMessage =
+          currentStateOutput.getPendingMessage(jobResource, partition, instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
       if (state == TaskPartitionState.RUNNING
           || (state == TaskPartitionState.INIT && pendingMessage != null)) {
@@ -389,7 +390,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
-      result.put(instance, new TreeSet<Integer>());
+      result.put(instance, new TreeSet<>());
     }
 
     // First, add all task partitions from JobContext
@@ -438,7 +439,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           // Check if this is a dropping transition
           if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) {
             if (!tasksToDrop.containsKey(instance)) {
-              tasksToDrop.put(instance, new HashSet<Integer>());
+              tasksToDrop.put(instance, new HashSet<>());
             }
             tasksToDrop.get(instance).add(pId);
           }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 5f4ac14..2f0be85 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -71,7 +71,6 @@ public class TaskDriver {
   /** For logging */
   private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class);
 
-
   /** Default time out for monitoring workflow or job state */
   private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
 
@@ -93,20 +92,21 @@ public class TaskDriver {
   private final HelixAdmin _admin;
   private final String _clusterName;
 
-
   public TaskDriver(HelixManager manager) {
     this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
         manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
   public TaskDriver(HelixZkClient client, String clusterName) {
-    this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
+    this(client, new ZkBaseDataAccessor<>(client), clusterName);
   }
 
-  public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+  public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
+      String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ZkHelixPropertyStore<>(baseAccessor,
-            PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
+        new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName),
+            null),
+        clusterName);
   }
 
   @Deprecated
@@ -123,9 +123,8 @@ public class TaskDriver {
     _clusterName = clusterName;
   }
 
-
-  /** Schedules a new workflow
-   *
+  /**
+   * Schedules a new workflow
    * @param flow
    */
   public void start(Workflow flow) {
@@ -137,7 +136,7 @@ public class TaskDriver {
     WorkflowConfig newWorkflowConfig =
         new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
 
-    Map<String, String> jobTypes = new HashMap<String, String>();
+    Map<String, String> jobTypes = new HashMap<>();
     // add all job configs.
     for (String job : flow.getJobConfigs().keySet()) {
       JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
@@ -196,13 +195,12 @@ public class TaskDriver {
           .setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow);
     }
     if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) {
-      throw new HelixException(String
-          .format("Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}",
-              workflow, newWorkflowConfig.getWorkflowId()));
+      throw new HelixException(String.format(
+          "Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", workflow,
+          newWorkflowConfig.getWorkflowId()));
     }
 
-    WorkflowConfig currentConfig =
-        TaskUtil.getWorkflowConfig(_accessor, workflow);
+    WorkflowConfig currentConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
     if (currentConfig == null) {
       throw new HelixException("Workflow " + workflow + " does not exist!");
     }
@@ -223,7 +221,6 @@ public class TaskDriver {
 
   /**
    * Creates a new named job queue (workflow)
-   *
    * @param queue
    */
   public void createQueue(JobQueue queue) {
@@ -233,7 +230,6 @@ public class TaskDriver {
   /**
    * Remove all completed or failed jobs in a job queue
    * Same as {@link #cleanupQueue(String)}
-   *
    * @param queue name of the queue
    * @throws Exception
    */
@@ -256,7 +252,8 @@ public class TaskDriver {
    * the queue has to be stopped prior to this call
    * @param queue queue name
    * @param job job name, denamespaced
-   * @param forceDelete
+   * @param forceDelete CAUTION: if set true, all job's related zk nodes will
+   *          be clean up from zookeeper even if its workflow information can not be found.
    */
   public void deleteJob(final String queue, final String job, boolean forceDelete) {
     deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete);
@@ -344,20 +341,17 @@ public class TaskDriver {
 
   /**
    * Adds a new job to the end an existing named queue.
-   *
    * @param queue
    * @param job
    * @param jobBuilder
    * @throws Exception
    */
-  public void enqueueJob(final String queue, final String job,
-      JobConfig.Builder jobBuilder) {
+  public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) {
     enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder));
   }
 
   /**
    * Batch add jobs to queues that garantee
-   *
    * @param queue
    * @param jobs
    * @param jobBuilders
@@ -365,7 +359,6 @@ public class TaskDriver {
   public void enqueueJobs(final String queue, final List<String> jobs,
       final List<JobConfig.Builder> jobBuilders) {
 
-
     // Get the job queue config and capacity
     WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
     if (workflowConfig == null) {
@@ -419,78 +412,75 @@ public class TaskDriver {
     }
 
     // update the job dag to append the job to the end of the queue.
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          // For some reason, the WorkflowConfig for this JobQueue doesn't exist
-          // In this case, we cannot proceed and must alert the user
-          throw new HelixException(
-              String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
-        }
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData == null) {
+        // For some reason, the WorkflowConfig for this JobQueue doesn't exist
+        // In this case, we cannot proceed and must alert the user
+        throw new HelixException(
+            String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
+      }
+
+      // Add the node to the existing DAG
+      JobDag jobDag = JobDag
+          .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+      Set<String> allNodes = jobDag.getAllNodes();
+      if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+        throw new IllegalStateException(
+            String.format("Queue %s already reaches its max capacity %f, failed to add %s", queue,
+                capacity, jobs.toString()));
+      }
 
-        // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        Set<String> allNodes = jobDag.getAllNodes();
-        if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+      String lastNodeName = null;
+      for (int i = 0; i < namespacedJobNames.size(); i++) {
+        String namespacedJobName = namespacedJobNames.get(i);
+        if (allNodes.contains(namespacedJobName)) {
           throw new IllegalStateException(String
-              .format("Queue %s already reaches its max capacity %f, failed to add %s", queue,
-                  capacity, jobs.toString()));
+              .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
         }
-
-        String lastNodeName = null;
-        for (int i = 0; i < namespacedJobNames.size(); i++) {
-          String namespacedJobName = namespacedJobNames.get(i);
-          if (allNodes.contains(namespacedJobName)) {
-            throw new IllegalStateException(String
-                .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
-          }
-          jobDag.addNode(namespacedJobName);
-
-          // Add the node to the end of the queue
-          String candidate = null;
-          if (lastNodeName == null) {
-            for (String node : allNodes) {
-              if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
-                candidate = node;
-                break;
-              }
+        jobDag.addNode(namespacedJobName);
+
+        // Add the node to the end of the queue
+        String candidate = null;
+        if (lastNodeName == null) {
+          for (String node : allNodes) {
+            if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
+              candidate = node;
+              break;
             }
-          } else {
-            candidate = lastNodeName;
-          }
-          if (candidate != null) {
-            jobDag.addParentToChild(candidate, namespacedJobName);
-            lastNodeName = namespacedJobName;
           }
+        } else {
+          candidate = lastNodeName;
+        }
+        if (candidate != null) {
+          jobDag.addParentToChild(candidate, namespacedJobName);
+          lastNodeName = namespacedJobName;
         }
+      }
 
-        // Add job type if job type is not null
-        Map<String, String> jobTypes =
-            currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
-        for (String jobType : jobTypeList) {
-          if (jobType != null) {
-            if (jobTypes == null) {
-              jobTypes = new HashMap<>();
-            }
-            jobTypes.put(queue, jobType);
+      // Add job type if job type is not null
+      Map<String, String> jobTypes =
+          currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+      for (String jobType : jobTypeList) {
+        if (jobType != null) {
+          if (jobTypes == null) {
+            jobTypes = new HashMap<>();
           }
+          jobTypes.put(queue, jobType);
         }
+      }
 
-        if (jobTypes != null) {
-          currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
-        }
-        // Save the updated DAG
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
-        }
-        return currentData;
+      if (jobTypes != null) {
+        currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
       }
+      // Save the updated DAG
+      try {
+        currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
+            jobDag.toJson());
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
+      }
+      return currentData;
     };
 
     String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
@@ -540,11 +530,11 @@ public class TaskDriver {
       throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
     }
 
-    Set<String> jobs = new HashSet<String>();
+    Set<String> jobs = new HashSet<>();
     for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
       TaskState curState = wCtx.getJobState(jobNode);
-      if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
-          || curState == TaskState.FAILED)) {
+      if (curState != null && curState == TaskState.ABORTED || curState == TaskState.COMPLETED
+          || curState == TaskState.FAILED) {
         jobs.add(jobNode);
       }
     }
@@ -558,8 +548,7 @@ public class TaskDriver {
     _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
 
     IdealState is = buildWorkflowIdealState(workflow);
-    TaskUtil
-        .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+    TaskUtil.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
 
     _admin.setResourceIdealState(_clusterName, workflow, is);
   }
@@ -576,12 +565,12 @@ public class TaskDriver {
 
   private IdealState buildWorkflowIdealState(String workflow) {
     CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
-    IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1)
-        .setNumPartitions(1).setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView();
+    IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1)
+        .setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView();
 
     IdealState is = IsBuilder.build();
-    is.getRecord().setListField(workflow, new ArrayList<String>());
-    is.getRecord().setMapField(workflow, new HashMap<String, String>());
+    is.getRecord().setListField(workflow, new ArrayList<>());
+    is.getRecord().setMapField(workflow, new HashMap<>());
     is.setRebalancerClassName(WorkflowRebalancer.class.getName());
 
     return is;
@@ -611,23 +600,19 @@ public class TaskDriver {
 
   /**
    * Public async method to stop a workflow/queue.
-   *
    * This call only send STOP command to Helix, it does not check
    * whether the workflow (all jobs) has been stopped yet.
-   *
    * @param workflow
    */
-  public void stop(String workflow) throws InterruptedException {
+  public void stop(String workflow) {
     setWorkflowTargetState(workflow, TargetState.STOP);
   }
 
   /**
    * Public sync method to stop a workflow/queue with timeout
-   *
    * Basically the workflow and all of its jobs has been stopped if this method return success.
-   *
-   * @param workflow  The workflow name
-   * @param timeout   The timeout for stopping workflow/queue in milisecond
+   * @param workflow The workflow name
+   * @param timeout The timeout for stopping workflow/queue in milisecond
    */
   public void waitToStop(String workflow, long timeout) throws InterruptedException {
     setWorkflowTargetState(workflow, TargetState.STOP);
@@ -636,7 +621,8 @@ public class TaskDriver {
     while (System.currentTimeMillis() <= endTime) {
       WorkflowContext workflowContext = getWorkflowContext(workflow);
 
-      if (workflowContext == null || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) {
+      if (workflowContext == null
+          || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) {
         Thread.sleep(1000);
       } else {
         // Successfully stopped
@@ -651,7 +637,6 @@ public class TaskDriver {
 
   /**
    * Public method to delete a workflow/queue.
-   *
    * @param workflow
    */
   public void delete(String workflow) {
@@ -660,11 +645,10 @@ public class TaskDriver {
 
   /**
    * Public method to delete a workflow/queue.
-   *
    * @param workflow
    * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will
-   *                     be clean up immediately from zookeeper, no matter whether there are jobs
-   *                     are running or not.
+   *          be clean up immediately from zookeeper, no matter whether there are jobs
+   *          are running or not.
    */
   public void delete(String workflow, boolean forceDelete) {
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
@@ -693,7 +677,8 @@ public class TaskDriver {
 
   private void removeWorkflowFromZK(String workflow) {
     Set<String> jobSet = new HashSet<>();
-    // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow
+    // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove
+    // workflow
     WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
     if (wCfg != null) {
       jobSet.addAll(wCfg.getJobDag().getAllNodes());
@@ -709,11 +694,11 @@ public class TaskDriver {
    * Public synchronized method to wait for a delete operation to fully complete with timeout.
    * When this method returns, it means that a queue (workflow) has been completely deleted, meaning
    * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted.
-   *
    * @param workflow workflow/jobqueue name
    * @param timeout duration to give to delete operation to completion
    */
-  public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException {
+  public void deleteAndWaitForCompletion(String workflow, long timeout)
+      throws InterruptedException {
     delete(workflow);
     long endTime = System.currentTimeMillis() + timeout;
 
@@ -746,9 +731,11 @@ public class TaskDriver {
     if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
       failed.append("WorkflowContext ");
     }
-    throw new HelixException(String
-        .format("Failed to delete the workflow/queue %s within %d milliseconds. "
-            + "The following components still remain: %s", workflow, timeout, failed.toString()));
+    throw new HelixException(
+        String.format(
+            "Failed to delete the workflow/queue %s within %d milliseconds. "
+                + "The following components still remain: %s",
+            workflow, timeout, failed.toString()));
   }
 
   /**
@@ -780,30 +767,27 @@ public class TaskDriver {
     }
 
     WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
-    if (state != TargetState.DELETE && workflowContext != null &&
-        workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
+    if (state != TargetState.DELETE && workflowContext != null
+        && workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
       // Should not update target state for completed workflow
       LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
           + state);
       return;
     }
 
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
-              state.name());
-        } else {
-          LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
-              + currentData);
-        }
-        return currentData;
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+            state.name());
+      } else {
+        LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null.");
       }
+      return currentData;
     };
 
     PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
-    _accessor.getBaseDataAccessor()
-        .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+    _accessor.getBaseDataAccessor().update(workflowConfigKey.getPath(), updater,
+        AccessOption.PERSISTENT);
     RebalanceScheduler.invokeRebalance(_accessor, workflow);
   }
 
@@ -841,11 +825,10 @@ public class TaskDriver {
 
   /**
    * Batch get the configurations of all workflows in this cluster.
-   *
    * @return
    */
   public Map<String, WorkflowConfig> getWorkflows() {
-    Map<String, WorkflowConfig> workflowConfigMap = new HashMap<String, WorkflowConfig>();
+    Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
     Map<String, ResourceConfig> resourceConfigMap =
         _accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs());
 
@@ -865,7 +848,6 @@ public class TaskDriver {
    * This call will be blocked until either workflow reaches to one of the state specified
    * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
    * Otherwise, it will return current workflow state
-   *
    * @param workflowName The workflow to be monitored
    * @param timeout A long integer presents the time out, in milliseconds
    * @param targetStates Specified states that user would like to stop monitoring
@@ -877,14 +859,15 @@ public class TaskDriver {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
-    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+    Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(targetStates));
 
     long timeToSleep = timeout > 100L ? 100L : timeout;
     do {
       Thread.sleep(timeToSleep);
       ctx = getWorkflowContext(workflowName);
-    } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
-        .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + timeout);
+    } while ((ctx == null || ctx.getWorkflowState() == null
+        || !allowedStates.contains(ctx.getWorkflowState()))
+        && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
       throw new HelixException(String.format(
@@ -900,7 +883,6 @@ public class TaskDriver {
    * This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES.
    * If timeout happens, then it will throw a HelixException, Otherwise, it will return
    * current job state.
-   *
    * @param workflowName The workflow to be monitored
    * @param targetStates Specified states that user would like to stop monitoring
    * @return A TaskState, which is current workflow state
@@ -915,7 +897,6 @@ public class TaskDriver {
    * This call will be blocked until either specified job reaches to one of the state
    * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
    * Otherwise, it will return current job state
-   *
    * @param workflowName The workflow that contains the job to monitor
    * @param jobName The specified job to monitor
    * @param timeout A long integer presents the time out, in milliseconds
@@ -952,8 +933,9 @@ public class TaskDriver {
     do {
       Thread.sleep(timeToSleep);
       ctx = getWorkflowContext(workflowName);
-    } while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates
-        .contains(ctx.getJobState(jobName))) && System.currentTimeMillis() < st + timeout);
+    } while ((ctx == null || ctx.getJobState(jobName) == null
+        || !allowedStates.contains(ctx.getJobState(jobName)))
+        && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
       throw new HelixException(
@@ -968,7 +950,6 @@ public class TaskDriver {
    * This is a wrapper function for monitoring job state with default timeout 2 MINUTES.
    * If timeout happens, then it will throw a HelixException, Otherwise, it will return
    * current job state
-   *
    * @param workflowName The workflow that contains the job to monitor
    * @param jobName The specified job to monitor
    * @param states Specified states that user would like to stop monitoring
@@ -981,12 +962,12 @@ public class TaskDriver {
   }
 
   /**
-   * This function returns the timestamp of the very last task that was scheduled. It is provided to help determine
+   * This function returns the timestamp of the very last task that was scheduled. It is provided to
+   * help determine
    * whether a given Workflow/Job/Task is stuck.
-   *
    * @param workflowName The name of the workflow
    * @return timestamp of the most recent job scheduled.
-   * -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
+   *         -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
    */
   public long getLastScheduledTaskTimestamp(String workflowName) {
     return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp();
@@ -998,7 +979,6 @@ public class TaskDriver {
     Integer taskPartitionIndex = null;
     TaskPartitionState state = null;
 
-
     WorkflowContext workflowContext = getWorkflowContext(workflowName);
     if (workflowContext != null) {
       Map<String, TaskState> allJobStates = workflowContext.getJobStates();
@@ -1035,9 +1015,8 @@ public class TaskDriver {
    * @param taskName name of task. Optional if scope is WORKFLOW or JOB
    * @return null if key-value pair not found or this content store does not exist. Otherwise,
    *         return a String
-   *
    * @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)},
-   * {@link #getJobUserContentMap(String, String)},
+   *             {@link #getJobUserContentMap(String, String)},
    * @{{@link #getTaskContentMap(String, String, String)}}
    */
   @Deprecated
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 51b21eb..83a790f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -69,7 +69,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
   // Split it into status update and assign. But there are couple of data need
   // to pass around.
-  public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
+  public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
+      BestPossibleStateOutput bestPossibleOutput) {
 
     // Fetch workflow configuration and context
     if (workflowCfg == null) {
@@ -93,7 +95,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       // If timeout point has already been passed, it will not be scheduled
       scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
 
-      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
+          && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
         workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
         _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
       }
@@ -107,7 +110,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
     // Step 3: handle workflow that should STOP
     // For workflows that already reached final states, STOP should not take into effect
-    if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP.equals(targetState)) {
+    if (!finalStates.contains(workflowCtx.getWorkflowState())
+        && TargetState.STOP.equals(targetState)) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
       if (isWorkflowStopped(workflowCtx, workflowCfg)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
@@ -202,8 +206,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
   }
 
-  public WorkflowContext getOrInitializeWorkflowContext(
-      String workflowName, TaskDataCache cache) {
+  public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) {
     WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
     if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
@@ -380,8 +383,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
     jobIS = builder.build();
     for (int i = 0; i < numPartitions; i++) {
-      jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
-      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
+      jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<>());
+      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<>());
     }
     jobIS.setRebalancerClassName(JobRebalancer.class.getName());
     admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
@@ -443,7 +446,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Ready to start workflow " + newWorkflowName);
         }
-        if (lastScheduled == null || !newWorkflowName.equals(lastScheduled)) {
+        if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =
               cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
           TaskDriver driver = new TaskDriver(_manager);
@@ -453,7 +456,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
               driver.start(clonedWf);
             } catch (Exception e) {
               LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
+              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
+                  TaskState.FAILED);
             }
           }
           // Persist workflow start regardless of success to avoid retrying and failing
@@ -582,7 +586,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         // and jobs will rescheduled again.
         removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
       }
-     } else {
+    } else {
       LOG.info("Did not clean up workflow " + workflow
           + " because neither the workflow is non-terminable nor is set to DELETE.");
     }
@@ -598,5 +602,4 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     }
     cache.removeContext(workflow);
   }
-
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index 6080399..1e06269 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -37,7 +37,6 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
@@ -59,8 +58,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
   private static final String JOB_COMMAND = "DummyCommand";
   private Map<String, String> _jobCommandMap;
   private Map<String, Integer> _quotaTypeExecutionCount = new ConcurrentHashMap<>();
-  private Set<String> _availableQuotaTypes =
-      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private Set<String> _availableQuotaTypes = Collections.newSetFromMap(new ConcurrentHashMap<>());
   private boolean _finishTask = false;
 
   @BeforeClass
@@ -87,24 +85,9 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
       // Set task callbacks
       Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
-      TaskFactory shortTaskFactory = new TaskFactory() {
-        @Override
-        public Task createNewTask(TaskCallbackContext context) {
-          return new ShortTask(context, instanceName);
-        }
-      };
-      TaskFactory longTaskFactory = new TaskFactory() {
-        @Override
-        public Task createNewTask(TaskCallbackContext context) {
-          return new LongTask(context, instanceName);
-        }
-      };
-      TaskFactory failTaskFactory = new TaskFactory() {
-        @Override
-        public Task createNewTask(TaskCallbackContext context) {
-          return new FailTask(context, instanceName);
-        }
-      };
+      TaskFactory shortTaskFactory = context -> new ShortTask(context, instanceName);
+      TaskFactory longTaskFactory = context -> new LongTask(context, instanceName);
+      TaskFactory failTaskFactory = context -> new FailTask(context, instanceName);
       taskFactoryReg.put("ShortTask", shortTaskFactory);
       taskFactoryReg.put("LongTask", longTaskFactory);
       taskFactoryReg.put("FailTask", failTaskFactory);
@@ -155,7 +138,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     for (int i = 0; i < 10; i++) {
       List<TaskConfig> taskConfigs = new ArrayList<>();
-      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
       JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
           .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
       workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -193,7 +176,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     for (int i = 0; i < 10; i++) {
       List<TaskConfig> taskConfigs = new ArrayList<>();
-      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
       JobConfig.Builder jobConfigBulider =
           new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
               .setJobCommandConfigMap(_jobCommandMap).setJobType("UNDEFINED");
@@ -231,7 +214,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     for (int i = 0; i < 5; i++) {
       List<TaskConfig> taskConfigs = new ArrayList<>();
-      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
       JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
           .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
       workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -239,7 +222,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     for (int i = 5; i < 10; i++) {
       List<TaskConfig> taskConfigs = new ArrayList<>();
-      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+      taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
       JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
           .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("B");
       workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -283,8 +266,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     }
 
     // Test that the next two are not executing
-    JobContext context_2 = _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
-    JobContext context_3 = _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+    JobContext context_2 =
+        _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+    JobContext context_3 =
+        _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
     Assert.assertNull(context_2.getPartitionState(0));
     Assert.assertNull(context_3.getPartitionState(0));
 
@@ -493,7 +478,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     for (int i = 0; i < numWorkflows; i++) {
       String workflowName = workflowNames.get(i);
       TaskState state = (i % 3 == 1) ? TaskState.FAILED : TaskState.COMPLETED;
-      Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
+      Assert.assertEquals(TaskDriver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
           state);
     }
 
@@ -536,7 +521,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     // First run some jobs with quotaType A
     List<TaskConfig> taskConfigs = new ArrayList<>();
-    taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+    taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
     JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
         .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
 
@@ -553,7 +538,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     // Run some jobs with quotaType B
     // First run some jobs with quotaType A
     taskConfigs = new ArrayList<>();
-    taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+    taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
     jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
         .setJobCommandConfigMap(_jobCommandMap).setJobType("B");
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index e12b4a9..6de1d3f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -63,8 +63,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(commandConfig);
 
-    Workflow flow = WorkflowGenerator
-        .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
+    Workflow flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName, jobBuilder)
         .setExpiry(expiry).build();
 
     _driver.start(flow);
@@ -77,8 +76,8 @@ public class TestTaskRebalancer extends TaskTestBase {
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName);
 
     // Ensure context and config exist
-    Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
-        AccessOption.PERSISTENT));
+    Assert.assertTrue(
+        _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for job to finish and expire
@@ -86,8 +85,8 @@ public class TestTaskRebalancer extends TaskTestBase {
     Thread.sleep(expiry + 100);
 
     // Ensure workflow config and context were cleaned up by now
-    Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
-        AccessOption.PERSISTENT));
+    Assert.assertFalse(
+        _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
     Assert.assertNull(accessor.getProperty(workflowCfgKey));
   }
 
@@ -120,7 +119,8 @@ public class TestTaskRebalancer extends TaskTestBase {
     }
   }
 
-  @Test public void partitionSet() throws Exception {
+  @Test
+  public void partitionSet() throws Exception {
     final String jobResource = "partitionSet";
     ImmutableList<String> targetPartitions =
         ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
@@ -170,7 +170,8 @@ public class TestTaskRebalancer extends TaskTestBase {
     }
   }
 
-  @Test public void timeouts() throws Exception {
+  @Test
+  public void timeouts() throws Exception {
     final String jobResource = "timeouts";
 
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
@@ -216,12 +217,10 @@ public class TestTaskRebalancer extends TaskTestBase {
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
     Set<String> slave = Sets.newHashSet("SLAVE");
-    JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
-    JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     _driver.enqueueJob(queueName, "masterJob", job1);
     _driver.enqueueJob(queueName, "slaveJob", job2);