You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/11/12 01:50:46 UTC

[helix] branch master updated: Refine Task Framework log format (#1511)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d50872e  Refine Task Framework log format (#1511)
d50872e is described below

commit d50872e9caeabbc5a063bc0133ddc5440ca8b31e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Nov 11 17:49:29 2020 -0800

    Refine Task Framework log format (#1511)
    
    Refine logs in AbstractTaskDispatcher.java, JobDispatcher.java and WorkflowDispatcher.java. This includes:
    
    Include exception backtrace in error messages.
    Include JobID in task-related error messages.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 104 ++++++++-------------
 .../java/org/apache/helix/task/JobDispatcher.java  |  36 ++++---
 .../org/apache/helix/task/WorkflowDispatcher.java  |  77 +++++++--------
 3 files changed, 92 insertions(+), 125 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 35c4c39..8f112e9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -170,9 +170,8 @@ public abstract class AbstractTaskDispatcher {
         if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
           TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
           if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
+            LOG.warn("Requested state {} is the same as the current state for instance {}.",
+                requestedState, instance);
           }
 
           // For STOPPED tasks, if the targetState is STOP, we should not honor requestedState
@@ -192,11 +191,8 @@ public abstract class AbstractTaskDispatcher {
             paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
           }
           assignedPartitions.get(instance).add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                String.format("Instance %s requested a state transition to %s for partition %s.",
-                    instance, requestedState, pName));
-          }
+          LOG.debug("Instance {} requested a state transition to {} for partition {}.", instance,
+              requestedState, pName);
           continue;
         }
 
@@ -210,10 +206,8 @@ public abstract class AbstractTaskDispatcher {
           }
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.get(instance).add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                nextState, instance));
-          }
+          LOG.debug("Setting task partition {} state to {} on instance {}.", pName, nextState,
+              instance);
         }
           break;
         case STOPPED: {
@@ -235,12 +229,10 @@ public abstract class AbstractTaskDispatcher {
           paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
           assignedPartitions.get(instance).add(pId);
 
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                nextState, instance));
-          }
+          LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
+              jobCtx.getName(), pName, nextState, instance);
         }
-          break;
+        break;
         case COMPLETED: {
           // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
           // order to avoid scheduling it again in this pipeline.
@@ -470,22 +462,18 @@ public abstract class AbstractTaskDispatcher {
       // so that Helix will cancel the transition.
       paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
       assignedPartitions.get(instance).add(pId);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
-            "Task partition %s has a pending state transition on instance %s INIT->RUNNING. CurrentState is %s "
-                + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
-            pName, instance, currState.name()));
-      }
+      LOG.debug(
+          "Task partition {} has a pending state transition on instance {} INIT->RUNNING. CurrentState is {} "
+              + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
+          pName, instance, currState.name());
     } else {
       // Otherwise, Just copy forward
       // the state assignment from the pending message
       paMap.put(pId, new PartitionAssignment(instance, pendingMessage.getToState()));
       assignedPartitions.get(instance).add(pId);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
-            "Task partition %s has a pending state transition on instance %s. Using the pending message ToState which was %s.",
-            pName, instance, pendingMessage.getToState()));
-      }
+      LOG.debug(
+          "Task partition {} has a pending state transition on instance {}. Using the pending message ToState which was {}.",
+          pName, instance, pendingMessage.getToState());
     }
   }
 
@@ -673,12 +661,10 @@ public abstract class AbstractTaskDispatcher {
           participantCapacity - cache.getParticipantActiveTaskCount(instance);
       // New tasks to be assigned
       int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
-            "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), "
-                + "Participant Max Task(%d). Remaining capacity %d.",
-            instance, jobCfgLimitation, participantCapacity, numToAssign));
-      }
+      LOG.debug(
+          "Throttle tasks to be assigned to instance {} using limitation: Job Concurrent Task({}), "
+              + "Participant Max Task({}). Remaining capacity {}.", instance, jobCfgLimitation,
+          participantCapacity, numToAssign);
       Set<Integer> throttledSet = new HashSet<>();
       if (numToAssign > 0) {
         List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
@@ -700,10 +686,8 @@ public abstract class AbstractTaskDispatcher {
           }
           // Increment the task attempt count at schedule time
           jobCtx.incrementNumAttempts(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
+          LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
+              jobCtx.getName(), pName, TaskPartitionState.RUNNING, instance);
         }
         cache.setParticipantActiveTaskCount(instance,
             cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
@@ -732,7 +716,8 @@ public abstract class AbstractTaskDispatcher {
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
         LOG.debug(
-            throttledSet.size() + "tasks are ready but throttled when assigned to participant.");
+            "tasks for job {} are ready but throttled (size: {}) when assigned to participant.",
+            jobCfg.getJobId(), throttledSet.size());
       }
     }
   }
@@ -889,8 +874,8 @@ public abstract class AbstractTaskDispatcher {
               // Found the partition number; new assignment has been made
               existsInNewAssignment = true;
               LOG.info(
-                  "Currently running task partition number: {} is being dropped from instance: {} and will be newly assigned to instance: {}. This is due to a LiveInstance/CurrentState change, and because this is a targeted task.",
-                  pId, instance, entry.getKey());
+                  "Currently running task partition number: {} (job: {}) is being dropped from instance: {} and will be newly assigned to instance: {}. This is due to a LiveInstance/CurrentState change, and because this is a targeted task.",
+                  jobContext.getName(), pId, instance, entry.getKey());
               break;
             }
           }
@@ -936,6 +921,7 @@ public abstract class AbstractTaskDispatcher {
     finishJobInRuntimeJobDag(clusterDataCache.getTaskDataCache(), workflowConfig.getWorkflowId(),
         jobName);
     long currentTime = System.currentTimeMillis();
+    LOG.debug("Mark job: {} FAILED.", jobName);
     workflowContext.setJobState(jobName, TaskState.FAILED);
     if (jobContext != null) {
       jobContext.setFinishTime(currentTime);
@@ -1208,9 +1194,9 @@ public abstract class AbstractTaskDispatcher {
     }
 
     if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) {
-      LOG.info(String.format(
-          "Job %s not ready to schedule due to not having enough quota for quota type %s", job,
-          quotaType));
+      LOG.info(
+          "Job {} not ready to schedule due to not having enough quota for quota type {}", job,
+          quotaType);
       return false;
     }
 
@@ -1227,10 +1213,8 @@ public abstract class AbstractTaskDispatcher {
 
     // If there is any parent job not started, this job should not be scheduled
     if (notStartedCount > 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job,
-            notStartedCount));
-      }
+        LOG.debug("Job {} is not ready to start, notStartedParent(s)={}.", job,
+            notStartedCount);
       return false;
     }
 
@@ -1238,30 +1222,24 @@ public abstract class AbstractTaskDispatcher {
     // job failure enabled
     if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job,
-            failedOrTimeoutCount));
-      }
+        LOG.debug("Job {} is not ready to start, failedCount(s)={}.", job,
+            failedOrTimeoutCount);
       return false;
     }
 
     if (workflowCfg.isJobQueue()) {
       // If job comes from a JobQueue, it should apply the parallel job logics
       if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job,
-              incompleteAllCount));
-        }
+          LOG.debug("Job {} is not ready to schedule, inCompleteJobs(s)={}.", job,
+              incompleteAllCount);
         return false;
       }
     } else {
       // If this job comes from a generic workflow, job will not be scheduled until
       // all the direct parent jobs finished
       if (incompleteParentCount > 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job,
-              incompleteParentCount));
-        }
+          LOG.debug("Job {} is not ready to start, notFinishedParent(s)={}.", job,
+              incompleteParentCount);
         return false;
       }
     }
@@ -1296,10 +1274,10 @@ public abstract class AbstractTaskDispatcher {
     if (runtimeJobDag != null) {
       runtimeJobDag.finishJob(jobName);
       LOG.debug(
-          String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName));
+          "Finish job {} of workflow {} for runtime job DAG", jobName, workflowName);
     } else {
-      LOG.warn(String.format("Failed to find runtime job DAG for workflow %s and job %s",
-          workflowName, jobName));
+      LOG.warn("Failed to find runtime job DAG for workflow {} and job {}",
+          workflowName, jobName);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 2dad107..218fe61 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -30,14 +30,14 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.helix.util.RebalanceUtil;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
+import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +58,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Fetch job configuration
     final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
     if (jobCfg == null) {
-      LOG.error("Job configuration is NULL for " + jobName);
+      LOG.error("Job configuration is NULL for {}", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
     String workflowResource = jobCfg.getWorkflow();
@@ -66,19 +66,19 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Fetch workflow configuration and context
     final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
     if (workflowCfg == null) {
-      LOG.error("Workflow configuration is NULL for " + jobName);
+      LOG.error("Workflow configuration is NULL for {}", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
     if (workflowCtx == null) {
-      LOG.error("Workflow context is NULL for " + jobName);
+      LOG.error("Workflow context is NULL for {}", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState != TargetState.START && targetState != TargetState.STOP) {
-      LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
-          + ".Stop scheduling job " + jobName);
+      LOG.info("Target state is {} for workflow {} .Stop scheduling job {}", targetState.name(),
+          workflowResource, jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -90,9 +90,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
         || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
         || jobState == TaskState.TIMED_OUT) {
-      LOG.info(String.format(
-          "Workflow %s or job %s is already in final state, workflow state (%s), job state (%s), clean up job IS.",
-          workflowResource, jobName, workflowState, jobState));
+      LOG.info(
+          "Workflow {} or job {} is already in final state, workflow state ({}), job state ({}), clean up job IS.",
+          workflowResource, jobName, workflowState, jobState);
       finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       // New pipeline trigger for workflow status update
@@ -103,7 +103,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     }
 
     if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
+      LOG.info("Job is not ready to be run since workflow is not ready {}", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -111,7 +111,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
         _dataProvider.getJobConfigMap(), _dataProvider,
         _dataProvider.getAssignableInstanceManager())) {
-      LOG.info("Job is not ready to run " + jobName);
+      LOG.info("Job is not ready to run {}", jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -145,7 +145,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
             : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
-      LOG.error("No available instance found for job!");
+      LOG.error("No available instance found for job: " + jobName);
     }
 
     TargetState jobTgtState = workflowCfg.getTargetState();
@@ -188,8 +188,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     _dataProvider.updateJobContext(jobName, jobCtx);
     _dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
 
-    LOG.debug("Job " + jobName + " new assignment "
-        + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
+    LOG.debug("Job {} new assignment",
+        Arrays.toString(newAssignment.getMappedPartitions().toArray()));
     return newAssignment;
   }
 
@@ -247,10 +247,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     long currentTime = System.currentTimeMillis();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
-    }
+    LOG.debug("All partitions: {} taskAssignment: {} excludedInstances: {}", allPartitions,
+        currentInstanceToTaskAssignments, excludedInstances);
 
     // Release resource for tasks in terminal state
     updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 68a5be5..fd482bb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -21,10 +21,8 @@ package org.apache.helix.task;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,21 +30,17 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import com.google.common.collect.Lists;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +69,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
     // Fetch workflow configuration and context
     if (workflowCfg == null) {
-      LOG.warn("Workflow configuration is NULL for " + workflow);
+      LOG.warn("Workflow configuration is NULL for {}", workflow);
       return;
     }
 
@@ -83,7 +77,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
-      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+      LOG.info("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
       updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
       cleanupWorkflow(workflow);
       return;
@@ -126,12 +120,12 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
     // Step 4: Handle finished workflows
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow " + workflow + " is finished.");
+      LOG.info("Workflow {} is finished.", workflow);
       updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
       long expiryTime = workflowCfg.getExpiry();
       // Check if this workflow has been finished past its expiry.
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
-        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
+        LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", workflow);
         cleanupWorkflow(workflow);
       } else {
         // schedule future cleanup work
@@ -157,7 +151,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     // For workflows that have already reached final states, STOP should not take into effect.
     if (!finalStates.contains(workflowCtx.getWorkflowState())
         && TargetState.STOP.equals(targetState)) {
-      LOG.info("Workflow " + workflow + " is marked as stopped. Workflow state is " + workflowCtx.getWorkflowState());
+      LOG.info("Workflow {} is marked as stopped. Workflow state is {}", workflow,
+          workflowCtx.getWorkflowState());
       if (isWorkflowStopped(workflowCtx, workflowCfg)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
         _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
@@ -188,9 +183,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         }
       }
     } else {
-      LOG.warn(String.format(
-          "Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it",
-          workflow));
+      LOG.warn(
+          "Failed to find runtime job DAG for workflow {}, existing runtime jobs may not be processed correctly for it",
+          workflow);
     }
   }
 
@@ -204,10 +199,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     }
 
     if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Workflow " + workflow + " is not ready to schedule");
-      // set the timer to trigger future schedule
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
+      LOG.info("Workflow {} is not ready to schedule, schedule future rebalance at {}", workflow,
           workflowCfg.getStartTime().getTime());
+      // set the timer to trigger future schedule
+      _rebalanceScheduler
+          .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime());
       return;
     }
 
@@ -219,7 +215,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       scheduleJobs(workflow, workflowCfg, workflowCtx, _clusterDataCache.getJobConfigMap(),
           _clusterDataCache, currentStateOutput, bestPossibleOutput);
     } else {
-      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+      LOG.debug("Workflow {} is not ready to be scheduled.", workflow);
     }
     _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
   }
@@ -251,7 +247,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       BestPossibleStateOutput bestPossibleOutput) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
-      LOG.debug("Jobs from recurring workflow are not schedule-able");
+      LOG.debug("Jobs from recurring workflow {} are not schedule-able", workflow);
       return;
     }
 
@@ -269,19 +265,16 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       String job = nextJob;
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Job " + job + " is already started or completed.");
-        }
+        LOG.debug("Job {} is already started or completed.", job);
         processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
         nextJob = jobDag.getNextJob();
         continue;
       }
 
       if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Workflow %s already have enough job in progress, "
-              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
-        }
+        LOG.debug(
+            "Workflow {} already have enough job in progress, scheduledJobs(s)={}, stop scheduling more jobs",
+            workflow, scheduledJobs);
         break;
       }
 
@@ -324,7 +317,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
     } catch (Exception e) {
       LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
-          String.format("Failed to compute job assignment for job %s", job));
+          String.format("Failed to compute job assignment for job %s", job, e));
     }
   }
 
@@ -367,9 +360,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       if (scheduleConfig.isRecurring()) {
         // Skip scheduling this workflow if it's not in a start state
         if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
-          }
+          LOG.debug("Skip scheduling since the workflow {} has not been started", workflow);
           return false;
         }
 
@@ -379,7 +370,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
           WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
           if (lastWorkflowCtx != null
               && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+            LOG.info("Skip scheduling workflow {} since last schedule {} has not completed yet.",
+                workflow, lastScheduled);
             return false;
           }
         }
@@ -395,9 +387,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
         String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ready to start workflow " + newWorkflowName);
-        }
+        LOG.debug("Ready to start workflow {}", newWorkflowName);
         if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =
               cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
@@ -407,9 +397,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
               // Start the cloned workflow
               driver.start(clonedWf);
             } catch (Exception e) {
-              LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
-                  TaskState.FAILED);
+              LOG.error("Failed to schedule cloned workflow {}. ", newWorkflowName, e);
+              _clusterStatusMonitor
+                  .updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
             }
           }
           // Persist workflow start regardless of success to avoid retrying and failing
@@ -451,11 +441,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     Map<String, HelixProperty> resourceConfigMap =
         accessor.getChildValuesMap(keyBuilder.resourceConfigs(), true);
     if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
+      LOG.error("No such workflow named {}", origWorkflowName);
       return null;
     }
     if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      LOG.error("Workflow with name {} already exists!", newWorkflowName);
       return null;
     }
 
@@ -531,7 +521,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       }
       if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
           _manager.getHelixPropertyStore(), workflow, jobs)) {
-        LOG.warn("Failed to clean up workflow " + workflow);
+        LOG.warn("Failed to clean up workflow {}", workflow);
       } else {
         // Only remove from cache when remove all workflow success. Otherwise, batch write will
         // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
@@ -539,8 +529,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
       }
     } else {
-      LOG.info("Did not clean up workflow " + workflow
-          + " because neither the workflow is non-terminable nor is set to DELETE.");
+      LOG.info(
+          "Did not clean up workflow {} because neither the workflow is non-terminable nor is set to DELETE.",
+          workflow);
     }
   }