You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/11/12 01:50:46 UTC
[helix] branch master updated: Refine Task Framework log format
(#1511)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new d50872e Refine Task Framework log format (#1511)
d50872e is described below
commit d50872e9caeabbc5a063bc0133ddc5440ca8b31e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Nov 11 17:49:29 2020 -0800
Refine Task Framework log format (#1511)
Refine logs in AbstractTaskDispatcher.java, JobDispatcher.java and WorkflowDispatcher.java. This includes:
Include exception backtrace in error messages.
Include JobID in task-related error messages.
---
.../apache/helix/task/AbstractTaskDispatcher.java | 104 ++++++++-------------
.../java/org/apache/helix/task/JobDispatcher.java | 36 ++++---
.../org/apache/helix/task/WorkflowDispatcher.java | 77 +++++++--------
3 files changed, 92 insertions(+), 125 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 35c4c39..8f112e9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -170,9 +170,8 @@ public abstract class AbstractTaskDispatcher {
if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
if (requestedState.equals(currState)) {
- LOG.warn(String.format(
- "Requested state %s is the same as the current state for instance %s.",
- requestedState, instance));
+ LOG.warn("Requested state {} is the same as the current state for instance {}.",
+ requestedState, instance);
}
// For STOPPED tasks, if the targetState is STOP, we should not honor requestedState
@@ -192,11 +191,8 @@ public abstract class AbstractTaskDispatcher {
paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
}
assignedPartitions.get(instance).add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- String.format("Instance %s requested a state transition to %s for partition %s.",
- instance, requestedState, pName));
- }
+ LOG.debug("Instance {} requested a state transition to {} for partition {}.", instance,
+ requestedState, pName);
continue;
}
@@ -210,10 +206,8 @@ public abstract class AbstractTaskDispatcher {
}
paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
assignedPartitions.get(instance).add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- nextState, instance));
- }
+ LOG.debug("Setting task partition {} state to {} on instance {}.", pName, nextState,
+ instance);
}
break;
case STOPPED: {
@@ -235,12 +229,10 @@ public abstract class AbstractTaskDispatcher {
paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
assignedPartitions.get(instance).add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- nextState, instance));
- }
+ LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
+ jobCtx.getName(), pName, nextState, instance);
}
- break;
+ break;
case COMPLETED: {
// The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
// order to avoid scheduling it again in this pipeline.
@@ -470,22 +462,18 @@ public abstract class AbstractTaskDispatcher {
// so that Helix will cancel the transition.
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
assignedPartitions.get(instance).add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has a pending state transition on instance %s INIT->RUNNING. CurrentState is %s "
- + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
- pName, instance, currState.name()));
- }
+ LOG.debug(
+ "Task partition {} has a pending state transition on instance {} INIT->RUNNING. CurrentState is {} "
+ + "Setting it back to INIT so that Helix can cancel the transition(if enabled).",
+ pName, instance, currState.name());
} else {
// Otherwise, Just copy forward
// the state assignment from the pending message
paMap.put(pId, new PartitionAssignment(instance, pendingMessage.getToState()));
assignedPartitions.get(instance).add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has a pending state transition on instance %s. Using the pending message ToState which was %s.",
- pName, instance, pendingMessage.getToState()));
- }
+ LOG.debug(
+ "Task partition {} has a pending state transition on instance {}. Using the pending message ToState which was {}.",
+ pName, instance, pendingMessage.getToState());
}
}
@@ -673,12 +661,10 @@ public abstract class AbstractTaskDispatcher {
participantCapacity - cache.getParticipantActiveTaskCount(instance);
// New tasks to be assigned
int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), "
- + "Participant Max Task(%d). Remaining capacity %d.",
- instance, jobCfgLimitation, participantCapacity, numToAssign));
- }
+ LOG.debug(
+ "Throttle tasks to be assigned to instance {} using limitation: Job Concurrent Task({}), "
+ + "Participant Max Task({}). Remaining capacity {}.", instance, jobCfgLimitation,
+ participantCapacity, numToAssign);
Set<Integer> throttledSet = new HashSet<>();
if (numToAssign > 0) {
List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
@@ -700,10 +686,8 @@ public abstract class AbstractTaskDispatcher {
}
// Increment the task attempt count at schedule time
jobCtx.incrementNumAttempts(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- TaskPartitionState.RUNNING, instance));
- }
+ LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
+ jobCtx.getName(), pName, TaskPartitionState.RUNNING, instance);
}
cache.setParticipantActiveTaskCount(instance,
cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
@@ -732,7 +716,8 @@ public abstract class AbstractTaskDispatcher {
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
LOG.debug(
- throttledSet.size() + "tasks are ready but throttled when assigned to participant.");
+ "tasks for job {} are ready but throttled (size: {}) when assigned to participant.",
+ jobCfg.getJobId(), throttledSet.size());
}
}
}
@@ -889,8 +874,8 @@ public abstract class AbstractTaskDispatcher {
// Found the partition number; new assignment has been made
existsInNewAssignment = true;
LOG.info(
- "Currently running task partition number: {} is being dropped from instance: {} and will be newly assigned to instance: {}. This is due to a LiveInstance/CurrentState change, and because this is a targeted task.",
- pId, instance, entry.getKey());
+ "Currently running task partition number: {} (job: {}) is being dropped from instance: {} and will be newly assigned to instance: {}. This is due to a LiveInstance/CurrentState change, and because this is a targeted task.",
+ jobContext.getName(), pId, instance, entry.getKey());
break;
}
}
@@ -936,6 +921,7 @@ public abstract class AbstractTaskDispatcher {
finishJobInRuntimeJobDag(clusterDataCache.getTaskDataCache(), workflowConfig.getWorkflowId(),
jobName);
long currentTime = System.currentTimeMillis();
+ LOG.debug("Mark job: {} FAILED.", jobName);
workflowContext.setJobState(jobName, TaskState.FAILED);
if (jobContext != null) {
jobContext.setFinishTime(currentTime);
@@ -1208,9 +1194,9 @@ public abstract class AbstractTaskDispatcher {
}
if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) {
- LOG.info(String.format(
- "Job %s not ready to schedule due to not having enough quota for quota type %s", job,
- quotaType));
+ LOG.info(
+ "Job {} not ready to schedule due to not having enough quota for quota type {}", job,
+ quotaType);
return false;
}
@@ -1227,10 +1213,8 @@ public abstract class AbstractTaskDispatcher {
// If there is any parent job not started, this job should not be scheduled
if (notStartedCount > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job,
- notStartedCount));
- }
+ LOG.debug("Job {} is not ready to start, notStartedParent(s)={}.", job,
+ notStartedCount);
return false;
}
@@ -1238,30 +1222,24 @@ public abstract class AbstractTaskDispatcher {
// job failure enabled
if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job,
- failedOrTimeoutCount));
- }
+ LOG.debug("Job {} is not ready to start, failedCount(s)={}.", job,
+ failedOrTimeoutCount);
return false;
}
if (workflowCfg.isJobQueue()) {
// If job comes from a JobQueue, it should apply the parallel job logics
if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job,
- incompleteAllCount));
- }
+ LOG.debug("Job {} is not ready to schedule, inCompleteJobs(s)={}.", job,
+ incompleteAllCount);
return false;
}
} else {
// If this job comes from a generic workflow, job will not be scheduled until
// all the direct parent jobs finished
if (incompleteParentCount > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job,
- incompleteParentCount));
- }
+ LOG.debug("Job {} is not ready to start, notFinishedParent(s)={}.", job,
+ incompleteParentCount);
return false;
}
}
@@ -1296,10 +1274,10 @@ public abstract class AbstractTaskDispatcher {
if (runtimeJobDag != null) {
runtimeJobDag.finishJob(jobName);
LOG.debug(
- String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName));
+ "Finish job {} of workflow {} for runtime job DAG", jobName, workflowName);
} else {
- LOG.warn(String.format("Failed to find runtime job DAG for workflow %s and job %s",
- workflowName, jobName));
+ LOG.warn("Failed to find runtime job DAG for workflow {} and job {}",
+ workflowName, jobName);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 2dad107..218fe61 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -30,14 +30,14 @@ import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.collect.ImmutableMap;
-import org.apache.helix.util.RebalanceUtil;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
+import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// Fetch job configuration
final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
if (jobCfg == null) {
- LOG.error("Job configuration is NULL for " + jobName);
+ LOG.error("Job configuration is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
String workflowResource = jobCfg.getWorkflow();
@@ -66,19 +66,19 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// Fetch workflow configuration and context
final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
if (workflowCfg == null) {
- LOG.error("Workflow configuration is NULL for " + jobName);
+ LOG.error("Workflow configuration is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (workflowCtx == null) {
- LOG.error("Workflow context is NULL for " + jobName);
+ LOG.error("Workflow context is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
TargetState targetState = workflowCfg.getTargetState();
if (targetState != TargetState.START && targetState != TargetState.STOP) {
- LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
- + ".Stop scheduling job " + jobName);
+ LOG.info("Target state is {} for workflow {} .Stop scheduling job {}", targetState.name(),
+ workflowResource, jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -90,9 +90,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
|| jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
|| jobState == TaskState.TIMED_OUT) {
- LOG.info(String.format(
- "Workflow %s or job %s is already in final state, workflow state (%s), job state (%s), clean up job IS.",
- workflowResource, jobName, workflowState, jobState));
+ LOG.info(
+ "Workflow {} or job {} is already in final state, workflow state ({}), job state ({}), clean up job IS.",
+ workflowResource, jobName, workflowState, jobState);
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
// New pipeline trigger for workflow status update
@@ -103,7 +103,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
- LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
+ LOG.info("Job is not ready to be run since workflow is not ready {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -111,7 +111,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
_dataProvider.getJobConfigMap(), _dataProvider,
_dataProvider.getAssignableInstanceManager())) {
- LOG.info("Job is not ready to run " + jobName);
+ LOG.info("Job is not ready to run {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -145,7 +145,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
: _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
- LOG.error("No available instance found for job!");
+ LOG.error("No available instance found for job: " + jobName);
}
TargetState jobTgtState = workflowCfg.getTargetState();
@@ -188,8 +188,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
_dataProvider.updateJobContext(jobName, jobCtx);
_dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
- LOG.debug("Job " + jobName + " new assignment "
- + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
+ LOG.debug("Job {} new assignment",
+ Arrays.toString(newAssignment.getMappedPartitions().toArray()));
return newAssignment;
}
@@ -247,10 +247,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
long currentTime = System.currentTimeMillis();
- if (LOG.isDebugEnabled()) {
- LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
- + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
- }
+ LOG.debug("All partitions: {} taskAssignment: {} excludedInstances: {}", allPartitions,
+ currentInstanceToTaskAssignments, excludedInstances);
// Release resource for tasks in terminal state
updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 68a5be5..fd482bb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -21,10 +21,8 @@ package org.apache.helix.task;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -32,21 +30,17 @@ import java.util.Set;
import java.util.TimeZone;
import com.google.common.collect.Lists;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +69,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Fetch workflow configuration and context
if (workflowCfg == null) {
- LOG.warn("Workflow configuration is NULL for " + workflow);
+ LOG.warn("Workflow configuration is NULL for {}", workflow);
return;
}
@@ -83,7 +77,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
- LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+ LOG.info("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
cleanupWorkflow(workflow);
return;
@@ -126,12 +120,12 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Step 4: Handle finished workflows
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
- LOG.info("Workflow " + workflow + " is finished.");
+ LOG.info("Workflow {} is finished.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
- LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
+ LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", workflow);
cleanupWorkflow(workflow);
} else {
// schedule future cleanup work
@@ -157,7 +151,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// For workflows that have already reached final states, STOP should not take into effect.
if (!finalStates.contains(workflowCtx.getWorkflowState())
&& TargetState.STOP.equals(targetState)) {
- LOG.info("Workflow " + workflow + " is marked as stopped. Workflow state is " + workflowCtx.getWorkflowState());
+ LOG.info("Workflow {} is marked as stopped. Workflow state is {}", workflow,
+ workflowCtx.getWorkflowState());
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
@@ -188,9 +183,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
}
}
} else {
- LOG.warn(String.format(
- "Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it",
- workflow));
+ LOG.warn(
+ "Failed to find runtime job DAG for workflow {}, existing runtime jobs may not be processed correctly for it",
+ workflow);
}
}
@@ -204,10 +199,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
- LOG.info("Workflow " + workflow + " is not ready to schedule");
- // set the timer to trigger future schedule
- _rebalanceScheduler.scheduleRebalance(_manager, workflow,
+ LOG.info("Workflow {} is not ready to schedule, schedule future rebalance at {}", workflow,
workflowCfg.getStartTime().getTime());
+ // set the timer to trigger future schedule
+ _rebalanceScheduler
+ .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime());
return;
}
@@ -219,7 +215,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
scheduleJobs(workflow, workflowCfg, workflowCtx, _clusterDataCache.getJobConfigMap(),
_clusterDataCache, currentStateOutput, bestPossibleOutput);
} else {
- LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+ LOG.debug("Workflow {} is not ready to be scheduled.", workflow);
}
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
@@ -251,7 +247,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
BestPossibleStateOutput bestPossibleOutput) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
- LOG.debug("Jobs from recurring workflow are not schedule-able");
+ LOG.debug("Jobs from recurring workflow {} are not schedule-able", workflow);
return;
}
@@ -269,19 +265,16 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
String job = nextJob;
TaskState jobState = workflowCtx.getJobState(job);
if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Job " + job + " is already started or completed.");
- }
+ LOG.debug("Job {} is already started or completed.", job);
processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
nextJob = jobDag.getNextJob();
continue;
}
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Workflow %s already have enough job in progress, "
- + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
- }
+ LOG.debug(
+ "Workflow {} already have enough job in progress, scheduledJobs(s)={}, stop scheduling more jobs",
+ workflow, scheduledJobs);
break;
}
@@ -324,7 +317,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
} catch (Exception e) {
LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
- String.format("Failed to compute job assignment for job %s", job));
+ String.format("Failed to compute job assignment for job %s", job, e));
}
}
@@ -367,9 +360,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
if (scheduleConfig.isRecurring()) {
// Skip scheduling this workflow if it's not in a start state
if (!workflowCfg.getTargetState().equals(TargetState.START)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
- }
+ LOG.debug("Skip scheduling since the workflow {} has not been started", workflow);
return false;
}
@@ -379,7 +370,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
if (lastWorkflowCtx != null
&& lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
- LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+ LOG.info("Skip scheduling workflow {} since last schedule {} has not completed yet.",
+ workflow, lastScheduled);
return false;
}
}
@@ -395,9 +387,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to start workflow " + newWorkflowName);
- }
+ LOG.debug("Ready to start workflow {}", newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
@@ -407,9 +397,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Start the cloned workflow
driver.start(clonedWf);
} catch (Exception e) {
- LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
- _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
- TaskState.FAILED);
+ LOG.error("Failed to schedule cloned workflow {}. ", newWorkflowName, e);
+ _clusterStatusMonitor
+ .updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
}
}
// Persist workflow start regardless of success to avoid retrying and failing
@@ -451,11 +441,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
Map<String, HelixProperty> resourceConfigMap =
accessor.getChildValuesMap(keyBuilder.resourceConfigs(), true);
if (!resourceConfigMap.containsKey(origWorkflowName)) {
- LOG.error("No such workflow named " + origWorkflowName);
+ LOG.error("No such workflow named {}", origWorkflowName);
return null;
}
if (resourceConfigMap.containsKey(newWorkflowName)) {
- LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+ LOG.error("Workflow with name {} already exists!", newWorkflowName);
return null;
}
@@ -531,7 +521,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
}
if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
_manager.getHelixPropertyStore(), workflow, jobs)) {
- LOG.warn("Failed to clean up workflow " + workflow);
+ LOG.warn("Failed to clean up workflow {}", workflow);
} else {
// Only remove from cache when remove all workflow success. Otherwise, batch write will
// clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
@@ -539,8 +529,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
}
} else {
- LOG.info("Did not clean up workflow " + workflow
- + " because neither the workflow is non-terminable nor is set to DELETE.");
+ LOG.info(
+ "Did not clean up workflow {} because neither the workflow is non-terminable nor is set to DELETE.",
+ workflow);
}
}