You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/04 04:48:00 UTC

[helix] branch master updated: Refine task framework log (#1565)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e30348  Refine task framework log (#1565)
2e30348 is described below

commit 2e30348d1447f0a107e6c19f59e38d37662787fa
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Dec 3 20:47:50 2020 -0800

    Refine task framework log (#1565)
    
    Refine task framework log.
---
 .../stages/task/TaskPersistDataStage.java          |  5 +--
 .../apache/helix/task/AbstractTaskDispatcher.java  | 34 +++++++----------
 .../task/FixedTargetTaskAssignmentCalculator.java  | 10 ++---
 .../main/java/org/apache/helix/task/JobConfig.java | 43 ++++++++++++----------
 .../java/org/apache/helix/task/JobDispatcher.java  |  6 +--
 .../java/org/apache/helix/task/JobRebalancer.java  |  6 +--
 .../java/org/apache/helix/task/RuntimeJobDag.java  |  3 +-
 .../java/org/apache/helix/task/ScheduleConfig.java |  2 +-
 .../java/org/apache/helix/task/TaskDriver.java     | 35 ++++++++++--------
 .../java/org/apache/helix/task/TaskStateModel.java |  2 +-
 .../main/java/org/apache/helix/task/TaskUtil.java  | 42 ++++++++++-----------
 .../org/apache/helix/task/WorkflowContext.java     |  2 +-
 .../org/apache/helix/task/WorkflowRebalancer.java  |  6 +--
 .../apache/helix/task/TestCleanExpiredJobs.java    |  2 +-
 14 files changed, 97 insertions(+), 101 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
index 0d522a6..a2e2e15 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
@@ -42,8 +42,7 @@ public class TaskPersistDataStage extends AbstractBaseStage {
     cache.getTaskDataCache().persistDataChanges(manager.getHelixDataAccessor());
 
     long endTime = System.currentTimeMillis();
-    LOG.info(
-        "END TaskPersistDataStage.process() for cluster " + cache.getClusterName() + " took " + (
-            endTime - startTime) + " ms");
+    LOG.info("END TaskPersistDataStage.process() for cluster {} took {} ms", cache.getClusterName(),
+        (endTime - startTime));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 90019b8..ef0d6b6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -148,8 +148,8 @@ public abstract class AbstractTaskDispatcher {
 
         if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
           LOG.warn(
-              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
-              instance, pId);
+              "Instance {} does not match the assigned participant for pId {} in the job context (job: {}). Skipping task scheduling.",
+              instance, pId, jobCtx.getName());
           continue;
         }
 
@@ -238,16 +238,14 @@ public abstract class AbstractTaskDispatcher {
           // order to avoid scheduling it again in this pipeline.
           assignedPartitions.get(instance).add(pId);
           paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                pName, currState));
-          }
+          LOG.debug(
+              "Task partition {} has completed with state {}. Marking as such in rebalancer context.",
+              pName, currState);
           partitionsToDropFromIs.add(pId);
           // This task is COMPLETED, so release this task
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
-          break;
+        break;
         case TIMED_OUT:
 
         case TASK_ERROR:
@@ -260,11 +258,9 @@ public abstract class AbstractTaskDispatcher {
           // (meaning it is not ABORTED and max number of attempts has not been reached yet)
           assignedPartitions.get(instance).add(pId);
           paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
-                pName, currState, jobCtx.getPartitionInfo(pId)));
-          }
+          LOG.debug(
+              "Task partition {} has error state {} with msg {}. Marking as such in rebalancer context.",
+              pName, currState, jobCtx.getPartitionInfo(pId));
           // The error policy is to fail the task as soon a single partition fails for a specified
           // maximum number of attempts or task is in ABORTED state.
           // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
@@ -276,9 +272,7 @@ public abstract class AbstractTaskDispatcher {
                 || currState.equals(TaskPartitionState.ERROR)) {
               skippedPartitions.add(pId);
               partitionsToDropFromIs.add(pId);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("skippedPartitions:" + skippedPartitions);
-              }
+              LOG.debug("skippedPartitions: {}", skippedPartitions);
             } else {
               // Mark the task to be started at some later time (if enabled)
               markPartitionDelayed(jobCfg, jobCtx, pId);
@@ -323,11 +317,9 @@ public abstract class AbstractTaskDispatcher {
         case DROPPED: {
           // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
           donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                pName, currState));
-          }
+          LOG.debug(
+              "Task partition {} has state {}. It will be dropped from the current ideal state.",
+              pName, currState);
           // If it's DROPPED, release this task. If INIT, do not release
           if (currState == TaskPartitionState.DROPPED) {
             assignableInstanceManager.release(instance, taskConfig, quotaType);
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 29d3cfd..6850220 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -22,17 +22,15 @@ package org.apache.helix.task;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.util.stream.Collectors;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -113,7 +111,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       if (tgtResourceIs != null) {
         targetPartitions.addAll(tgtResourceIs.getPartitionSet());
       } else {
-        LOG.warn("Missing target resource for the scheduled job!");
+        LOG.warn("Missing target resource for the scheduled job {}!",
+            taskCtx != null ? taskCtx.getName() : "null");
       }
     }
 
@@ -212,7 +211,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
     // IdealState of the target resource
     IdealState targetIdealState = idealStateMap.get(jobCfg.getTargetResource());
     if (targetIdealState == null) {
-      LOG.warn("Missing target resource for the scheduled job!");
+      LOG.warn("Missing target resource for the scheduled job {}!",
+          jobContext != null ? jobContext.getName() : "null");
       return Collections.emptyMap();
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 4c127f8..46f6b30 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -729,60 +729,65 @@ public class JobConfig extends ResourceConfig {
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.TargetResource));
+            String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.TargetResource));
       }
-      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
-          && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(
-            String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+          .isEmpty()) {
+        throw new IllegalArgumentException(String
+            .format("Job %s, %s cannot be an empty set", _jobId,
+                JobConfigProperty.TargetPartitionStates));
       }
       if (_taskConfigMap.isEmpty()) {
         // Check Job command is not null when none taskconfig specified
         if (_command == null) {
           throw new IllegalArgumentException(
-              String.format("%s cannot be null", JobConfigProperty.Command));
+              String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.Command));
         }
         // Check number of task is set when Job command is not null and none taskconfig specified
         if (_targetResource == null && _numberOfTasks == 0) {
-          throw new IllegalArgumentException("Either targetResource or numberOfTask should be set");
+          throw new IllegalArgumentException(
+              String.format("Job %s, Either targetResource or numberOfTask should be set", _jobId));
         }
       }
       // Check each either Job command is not null or none of task command is not null
       if (_command == null) {
         for (TaskConfig taskConfig : _taskConfigMap.values()) {
           if (taskConfig.getCommand() == null) {
-            throw new IllegalArgumentException(
-                String.format("Task %s command cannot be null", taskConfig.getId()));
+            throw new IllegalArgumentException(String
+                .format("Job %s, Task %s command cannot be null", _jobId, taskConfig.getId()));
           }
         }
       }
       if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) {
-        throw new IllegalArgumentException(
-            String.format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
+        throw new IllegalArgumentException(String
+            .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.Timeout,
+                _timeout));
       }
       if (_timeoutPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+        throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
             JobConfigProperty.TimeoutPerPartition, _timeoutPerTask));
       }
       if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+        throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
             JobConfigProperty.ConcurrentTasksPerInstance, _numConcurrentTasksPerInstance));
       }
       if (_maxAttemptsPerTask < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            JobConfigProperty.MaxAttemptsPerTask, _maxAttemptsPerTask));
+        throw new IllegalArgumentException(String
+            .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.MaxAttemptsPerTask,
+                _maxAttemptsPerTask));
       }
       if (_maxForcedReassignmentsPerTask < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+        throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
             JobConfigProperty.MaxForcedReassignmentsPerTask, _maxForcedReassignmentsPerTask));
       }
       if (_failureThreshold < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            JobConfigProperty.FailureThreshold, _failureThreshold));
+        throw new IllegalArgumentException(String
+            .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.FailureThreshold,
+                _failureThreshold));
       }
       if (_workflow == null) {
         throw new IllegalArgumentException(
-            String.format("%s cannot be null", JobConfigProperty.WorkflowID));
+            String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.WorkflowID));
       }
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 928382d..4b0e00f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -103,7 +103,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     }
 
     if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Job is not ready to be run since workflow is not ready {}", jobName);
+      LOG.info("Job {} is not ready to be run since workflow is not ready.", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -111,7 +111,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
         _dataProvider.getJobConfigMap(), _dataProvider,
         _dataProvider.getAssignableInstanceManager())) {
-      LOG.info("Job is not ready to run {}", jobName);
+      LOG.info("Job {} is not ready to run.", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -145,7 +145,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
             : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
-      LOG.error("No available instance found for job: " + jobName);
+      LOG.error("No available instance found for job: {}", jobName);
     }
 
     TargetState jobTgtState = workflowCfg.getTargetState();
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index a3c3435..4c96e83 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -47,7 +47,7 @@ public class JobRebalancer extends TaskRebalancer {
           jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
-    LOG.debug("Computer Best Partition for job: " + jobName);
+    LOG.debug("Computer Best Partition for job: {}.", jobName);
     if (_jobDispatcher == null) {
       _jobDispatcher = new JobDispatcher();
     }
@@ -56,8 +56,8 @@ public class JobRebalancer extends TaskRebalancer {
     _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
     ResourceAssignment resourceAssignment = _jobDispatcher.processJobStatusUpdateAndAssignment(
         jobName, currStateOutput, clusterData.getWorkflowContext(jobConfig.getWorkflow()));
-    LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s",
-        System.currentTimeMillis() - startTime, jobName));
+    LOG.debug("JobRebalancer computation takes {} ms for Job {}",
+        System.currentTimeMillis() - startTime, jobName);
     return resourceAssignment;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index 5c3379a..24a17da 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -149,7 +149,8 @@ public class RuntimeJobDag extends JobDag {
     if (!_inflightJobList.remove(job)) {
       // this job is not in in-flight list
       LOG.warn(
-          String.format("Job: %s has either finished already, never been scheduled, or been removed from DAG", job));
+          "Job: {} has either finished already, never been scheduled, or been removed from DAG",
+          job);
     }
     // Add finished job's successors to ready-list
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
index 1daa386..de2d6f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -111,7 +111,7 @@ public class ScheduleConfig {
     if (_recurUnit != null) {
       long converted = _recurUnit.toMillis(_recurInterval);
       if (converted < MIN_RECURRENCE_MILLIS) {
-        LOG.error("Recurrence must be at least " + MIN_RECURRENCE_MILLIS + " ms");
+        LOG.error("Recurrence must be at least {} ms", MIN_RECURRENCE_MILLIS);
         return false;
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c8ed25d..0e1b9b1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -140,7 +140,7 @@ public class TaskDriver {
    * @param flow
    */
   public void start(Workflow flow) {
-    LOG.info("Starting workflow " + flow.getName());
+    LOG.info("Starting workflow {}", flow.getName());
     flow.validate();
 
     validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1);
@@ -222,7 +222,7 @@ public class TaskDriver {
     // Should not let user changing DAG in the workflow
     newWorkflowConfig.setJobDag(currentConfig.getJobDag());
     if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
-      LOG.error("Failed to update workflow configuration for workflow " + workflow);
+      LOG.error("Failed to update workflow configuration for workflow {}", workflow);
     }
   }
 
@@ -351,14 +351,13 @@ public class TaskDriver {
     if (workflowState.equals(TaskState.COMPLETED.name())
         || workflowState.equals(TaskState.FAILED.name())
         || workflowState.equals(TaskState.ABORTED.name())) {
-      LOG.warn(
-          "Queue " + queue + " has already reached its final state, skip deleting job from it.");
+      LOG.warn("Queue {} has already reached its final state, skip deleting job from it.", queue);
       return;
     }
 
     if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue,
         Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), true)) {
-      LOG.error("Failed to delete job " + job + " from queue " + queue);
+      LOG.error("Failed to delete job {} from queue {}.", job, queue);
       throw new HelixException("Failed to delete job " + job + " from queue " + queue);
     }
   }
@@ -401,12 +400,12 @@ public class TaskDriver {
         Set<String> expiredJobs =
             TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
         if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
-          LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+          LOG.warn("Failed to clean up expired and completed jobs from queue {}", queue);
         }
       }
       workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
       if (workflowConfig.getJobDag().size() >= capacity) {
-        throw new HelixException("Failed to enqueue a job, queue is full.");
+        throw new HelixException(String.format( "Failed to enqueue job, queue %s is full.", queue));
       }
     }
 
@@ -726,7 +725,7 @@ public class TaskDriver {
         if (currentData != null) {
           Map<String, Map<String, String>> taskMap = currentData.getMapFields();
           if (taskMap == null) {
-            LOG.warn("Could not update the jobConfig: " + jobName + " Znode MapField is null.");
+            LOG.warn("Could not update the jobConfig: {} Znode MapField is null.", jobName );
             return null;
           }
           Map<String, Map<String, String>> newTaskMap = new HashMap<String, Map<String, String>>();
@@ -804,7 +803,9 @@ public class TaskDriver {
     }
 
     if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
-      throw new HelixException("Command must exist in either job or task, not both!");
+      throw new HelixException(String
+          .format("Command must exist in either jobconfig (%s) or taskconfig (%s), not both!", jobName,
+              taskConfig.getId()));
     }
   }
 
@@ -1065,7 +1066,7 @@ public class TaskDriver {
 
     WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
     if (workflowConfig == null) {
-      LOG.warn("WorkflowConfig for " + workflow + " not found!");
+      LOG.warn("WorkflowConfig for {} not found!", workflow);
       return;
     }
 
@@ -1073,8 +1074,8 @@ public class TaskDriver {
     if (state != TargetState.DELETE && workflowContext != null
         && workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
       // Should not update target state for completed workflow
-      LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
-          + state);
+      LOG.info("Workflow {} is already completed, skip to update its target state {}", workflow,
+          state);
       return;
     }
 
@@ -1083,7 +1084,9 @@ public class TaskDriver {
         currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
             state.name());
       } else {
-        LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null.");
+        LOG.warn(
+            "TargetState DataUpdater: Fails to update target state for {}. CurrentData is null.",
+            workflow);
       }
       return currentData;
     };
@@ -1173,7 +1176,7 @@ public class TaskDriver {
 
     if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
       throw new HelixException(String.format(
-          "Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
+          "Workflow %s context is empty or not in states: %s, current state: %s.",
           workflowName, Arrays.asList(targetStates),
           ctx == null ? "null" : ctx.getWorkflowState().toString()));
     }
@@ -1212,7 +1215,7 @@ public class TaskDriver {
     WorkflowConfig workflowConfig = getWorkflowConfig(workflowName);
 
     if (workflowConfig == null) {
-      throw new HelixException(String.format("Workflow \"%s\" does not exists!", workflowName));
+      throw new HelixException(String.format("Workflow %s does not exists!", workflowName));
     }
 
     long timeToSleep = timeout > 50L ? 50L : timeout;
@@ -1244,7 +1247,7 @@ public class TaskDriver {
       JobConfig jobConfig = getJobConfig(jobName);
       JobContext jbCtx = getJobContext(jobName);
       throw new HelixException(String.format(
-          "Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+          "Workflow %s context is null or job %s is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
           workflowName, jobName, allowedStates, ctx == null ? "null" : ctx,
           ctx != null ? ctx.getJobState(jobName) : "null", wfcfg, jobConfig, jbCtx));
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 705f118..3771a57 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -89,7 +89,7 @@ public class TaskStateModel extends StateModel {
 
     _taskRunner.cancel();
     TaskResult r = _taskRunner.waitTillDone();
-    LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+    LOG.info("Task {} completed with result {}.", msg.getPartitionName(), r);
 
     timeout_task.cancel(false);
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index dafd386..e2de939 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -678,19 +678,16 @@ public class TaskUtil {
     }
 
     if (!removeWorkflowConfig(accessor, workflow)) {
-      LOG.warn(
-          String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+      LOG.warn("Error occurred while trying to remove workflow config for {}.", workflow);
       return false;
     }
     if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
-      LOG.warn(String.format(
-          "Error occurred while trying to remove workflow idealstate/externalview for %s.",
-          workflow));
+      LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
+          workflow);
       return false;
     }
     if (!removeWorkflowContext(propertyStore, workflow)) {
-      LOG.warn(String.format("Error occurred while trying to remove workflow context for %s.",
-          workflow));
+      LOG.warn("Error occurred while trying to remove workflow context for {}.", workflow);
       return false;
     }
     return true;
@@ -712,13 +709,13 @@ public class TaskUtil {
       final Set<String> jobs, boolean maintainDependency) {
     boolean success = true;
     if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
-      LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
-          + workflow);
+      LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}.", jobs,
+          workflow);
       success = false;
     }
     if (!removeJobsState(propertyStore, workflow, jobs)) {
-      LOG.warn("Error occurred while trying to remove jobs states from workflow + " + workflow
-          + " jobs " + jobs);
+      LOG.warn("Error occurred while trying to remove jobs states from workflow {} jobs {}.",
+          workflow, jobs);
       success = false;
     }
     for (String job : jobs) {
@@ -865,12 +862,12 @@ public class TaskUtil {
       return false;
     }
     if (!cleanupJobIdealStateExtView(accessor, job)) {
-      LOG.warn(String.format(
-          "Error occurred while trying to remove job idealstate/externalview for %s.", job));
+      LOG.warn(
+          "Error occurred while trying to remove job idealstate/externalview for {}.", job);
       return false;
     }
     if (!removeJobContext(propertyStore, job)) {
-      LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+      LOG.warn("Error occurred while trying to remove job context for {}.", job);
       return false;
     }
     return true;
@@ -888,7 +885,7 @@ public class TaskUtil {
           JobDag jobDag = JobDag.fromJson(
               currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
           if (jobDag == null) {
-            LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+            LOG.warn("Could not update DAG for workflow: {} JobDag is null.", workflow);
             return null;
           }
           for (String job : jobsToRemove) {
@@ -907,7 +904,7 @@ public class TaskUtil {
 
     String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
     if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
-      LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+      LOG.warn("Failed to remove jobs {} from DAG of workflow {}", jobsToRemove, workflow);
       return false;
     }
 
@@ -940,7 +937,7 @@ public class TaskUtil {
       }
     };
     if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
-      LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+      LOG.warn("Fail to remove job state for jobs {} from workflow {}", jobs, workflow);
       return false;
     }
     return true;
@@ -951,9 +948,9 @@ public class TaskUtil {
     String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
     if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
       if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
-        LOG.warn(String.format(
-            "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
-            workflowJobResource, path));
+        LOG.warn(
+            "Error occurred while trying to remove workflow/jobcontext for {}. Failed to remove node {}.",
+            workflowJobResource, path);
         return false;
       }
     }
@@ -971,9 +968,8 @@ public class TaskUtil {
     PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
     if (accessor.getPropertyStat(cfgKey) != null) {
       if (!accessor.removeProperty(cfgKey)) {
-        LOG.warn(String.format(
-            "Error occurred while trying to remove config for %s. Failed to remove node %s.",
-            workflowJobResource, cfgKey));
+        LOG.warn("Error occurred while trying to remove config for {}. Failed to remove node {}.",
+            workflowJobResource, cfgKey);
         return false;
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 3396320..6e897c1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -142,7 +142,7 @@ public class WorkflowContext extends HelixProperty {
       long ret = Long.valueOf(t);
       return ret;
     } catch (NumberFormatException e) {
-      LOG.warn("Number error " + t + " for job start time of " + job);
+      LOG.warn("Number error {} for job start time of {}.", t, job, e);
       return -1;
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 5dcf3b2..09eb51b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -41,7 +41,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
     final String workflow = resource.getResourceName();
     long startTime = System.currentTimeMillis();
-    LOG.debug("Computer Best Partition for workflow: " + workflow);
+    LOG.debug("Compute Best Partition for workflow: {}", workflow);
     _workflowDispatcher.init(_manager);
     WorkflowContext workflowCtx = _workflowDispatcher
         .getOrInitializeWorkflowContext(workflow, clusterData.getTaskDataCache());
@@ -54,8 +54,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
     _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, currStateOutput,
         new BestPossibleStateOutput());
 
-    LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for workflow %s",
-        System.currentTimeMillis() - startTime, workflow));
+    LOG.debug("WorkflowRebalancer computation takes {} ms for workflow {}",
+        System.currentTimeMillis() - startTime, workflow);
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 9f1abbb..3fe6523 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -143,7 +143,7 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
       _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
       Assert.fail("Queue is not full.");
     } catch (HelixException e) {
-      Assert.assertTrue(e.getMessage().contains("queue is full"));
+      Assert.assertTrue(e.getMessage().contains("queue " + queue + " is full"));
     }
 
     for (int i = 0; i < capacity; i++) {