You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/04 04:48:00 UTC
[helix] branch master updated: Refine task framework log (#1565)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 2e30348 Refine task framework log (#1565)
2e30348 is described below
commit 2e30348d1447f0a107e6c19f59e38d37662787fa
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Dec 3 20:47:50 2020 -0800
Refine task framework log (#1565)
Refine task framework log.
---
.../stages/task/TaskPersistDataStage.java | 5 +--
.../apache/helix/task/AbstractTaskDispatcher.java | 34 +++++++----------
.../task/FixedTargetTaskAssignmentCalculator.java | 10 ++---
.../main/java/org/apache/helix/task/JobConfig.java | 43 ++++++++++++----------
.../java/org/apache/helix/task/JobDispatcher.java | 6 +--
.../java/org/apache/helix/task/JobRebalancer.java | 6 +--
.../java/org/apache/helix/task/RuntimeJobDag.java | 3 +-
.../java/org/apache/helix/task/ScheduleConfig.java | 2 +-
.../java/org/apache/helix/task/TaskDriver.java | 35 ++++++++++--------
.../java/org/apache/helix/task/TaskStateModel.java | 2 +-
.../main/java/org/apache/helix/task/TaskUtil.java | 42 ++++++++++-----------
.../org/apache/helix/task/WorkflowContext.java | 2 +-
.../org/apache/helix/task/WorkflowRebalancer.java | 6 +--
.../apache/helix/task/TestCleanExpiredJobs.java | 2 +-
14 files changed, 97 insertions(+), 101 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
index 0d522a6..a2e2e15 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
@@ -42,8 +42,7 @@ public class TaskPersistDataStage extends AbstractBaseStage {
cache.getTaskDataCache().persistDataChanges(manager.getHelixDataAccessor());
long endTime = System.currentTimeMillis();
- LOG.info(
- "END TaskPersistDataStage.process() for cluster " + cache.getClusterName() + " took " + (
- endTime - startTime) + " ms");
+ LOG.info("END TaskPersistDataStage.process() for cluster {} took {} ms", cache.getClusterName(),
+ (endTime - startTime));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 90019b8..ef0d6b6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -148,8 +148,8 @@ public abstract class AbstractTaskDispatcher {
if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
LOG.warn(
- "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
- instance, pId);
+ "Instance {} does not match the assigned participant for pId {} in the job context (job: {}). Skipping task scheduling.",
+ instance, pId, jobCtx.getName());
continue;
}
@@ -238,16 +238,14 @@ public abstract class AbstractTaskDispatcher {
// order to avoid scheduling it again in this pipeline.
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
- pName, currState));
- }
+ LOG.debug(
+ "Task partition {} has completed with state {}. Marking as such in rebalancer context.",
+ pName, currState);
partitionsToDropFromIs.add(pId);
// This task is COMPLETED, so release this task
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
- break;
+ break;
case TIMED_OUT:
case TASK_ERROR:
@@ -260,11 +258,9 @@ public abstract class AbstractTaskDispatcher {
// (meaning it is not ABORTED and max number of attempts has not been reached yet)
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
- pName, currState, jobCtx.getPartitionInfo(pId)));
- }
+ LOG.debug(
+ "Task partition {} has error state {} with msg {}. Marking as such in rebalancer context.",
+ pName, currState, jobCtx.getPartitionInfo(pId));
// The error policy is to fail the task as soon a single partition fails for a specified
// maximum number of attempts or task is in ABORTED state.
// But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
@@ -276,9 +272,7 @@ public abstract class AbstractTaskDispatcher {
|| currState.equals(TaskPartitionState.ERROR)) {
skippedPartitions.add(pId);
partitionsToDropFromIs.add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("skippedPartitions:" + skippedPartitions);
- }
+ LOG.debug("skippedPartitions: {}", skippedPartitions);
} else {
// Mark the task to be started at some later time (if enabled)
markPartitionDelayed(jobCfg, jobCtx, pId);
@@ -323,11 +317,9 @@ public abstract class AbstractTaskDispatcher {
case DROPPED: {
// currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
donePartitions.add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has state %s. It will be dropped from the current ideal state.",
- pName, currState));
- }
+ LOG.debug(
+ "Task partition {} has state {}. It will be dropped from the current ideal state.",
+ pName, currState);
// If it's DROPPED, release this task. If INIT, do not release
if (currState == TaskPartitionState.DROPPED) {
assignableInstanceManager.release(instance, taskConfig, quotaType);
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 29d3cfd..6850220 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -22,17 +22,15 @@ package org.apache.helix.task;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.util.stream.Collectors;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -113,7 +111,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
if (tgtResourceIs != null) {
targetPartitions.addAll(tgtResourceIs.getPartitionSet());
} else {
- LOG.warn("Missing target resource for the scheduled job!");
+ LOG.warn("Missing target resource for the scheduled job {}!",
+ taskCtx != null ? taskCtx.getName() : "null");
}
}
@@ -212,7 +211,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
// IdealState of the target resource
IdealState targetIdealState = idealStateMap.get(jobCfg.getTargetResource());
if (targetIdealState == null) {
- LOG.warn("Missing target resource for the scheduled job!");
+ LOG.warn("Missing target resource for the scheduled job {}!",
+ jobContext != null ? jobContext.getName() : "null");
return Collections.emptyMap();
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 4c127f8..46f6b30 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -729,60 +729,65 @@ public class JobConfig extends ResourceConfig {
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.TargetResource));
+ String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.TargetResource));
}
- if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
- && _targetPartitionStates.isEmpty()) {
- throw new IllegalArgumentException(
- String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
+ if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+ .isEmpty()) {
+ throw new IllegalArgumentException(String
+ .format("Job %s, %s cannot be an empty set", _jobId,
+ JobConfigProperty.TargetPartitionStates));
}
if (_taskConfigMap.isEmpty()) {
// Check Job command is not null when none taskconfig specified
if (_command == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.Command));
+ String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.Command));
}
// Check number of task is set when Job command is not null and none taskconfig specified
if (_targetResource == null && _numberOfTasks == 0) {
- throw new IllegalArgumentException("Either targetResource or numberOfTask should be set");
+ throw new IllegalArgumentException(
+ String.format("Job %s, Either targetResource or numberOfTask should be set", _jobId));
}
}
// Check each either Job command is not null or none of task command is not null
if (_command == null) {
for (TaskConfig taskConfig : _taskConfigMap.values()) {
if (taskConfig.getCommand() == null) {
- throw new IllegalArgumentException(
- String.format("Task %s command cannot be null", taskConfig.getId()));
+ throw new IllegalArgumentException(String
+ .format("Job %s, Task %s command cannot be null", _jobId, taskConfig.getId()));
}
}
}
if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) {
- throw new IllegalArgumentException(
- String.format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
+ throw new IllegalArgumentException(String
+ .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.Timeout,
+ _timeout));
}
if (_timeoutPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
JobConfigProperty.TimeoutPerPartition, _timeoutPerTask));
}
if (_numConcurrentTasksPerInstance < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
JobConfigProperty.ConcurrentTasksPerInstance, _numConcurrentTasksPerInstance));
}
if (_maxAttemptsPerTask < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- JobConfigProperty.MaxAttemptsPerTask, _maxAttemptsPerTask));
+ throw new IllegalArgumentException(String
+ .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.MaxAttemptsPerTask,
+ _maxAttemptsPerTask));
}
if (_maxForcedReassignmentsPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ throw new IllegalArgumentException(String.format("Job %s, %s has invalid value %s", _jobId,
JobConfigProperty.MaxForcedReassignmentsPerTask, _maxForcedReassignmentsPerTask));
}
if (_failureThreshold < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- JobConfigProperty.FailureThreshold, _failureThreshold));
+ throw new IllegalArgumentException(String
+ .format("Job %s, %s has invalid value %s", _jobId, JobConfigProperty.FailureThreshold,
+ _failureThreshold));
}
if (_workflow == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.WorkflowID));
+ String.format("Job %s, %s cannot be null", _jobId, JobConfigProperty.WorkflowID));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 928382d..4b0e00f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -103,7 +103,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
- LOG.info("Job is not ready to be run since workflow is not ready {}", jobName);
+ LOG.info("Job {} is not ready to be run since workflow is not ready.", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -111,7 +111,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
_dataProvider.getJobConfigMap(), _dataProvider,
_dataProvider.getAssignableInstanceManager())) {
- LOG.info("Job is not ready to run {}", jobName);
+ LOG.info("Job {} is not ready to run.", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -145,7 +145,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
: _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
- LOG.error("No available instance found for job: " + jobName);
+ LOG.error("No available instance found for job: {}", jobName);
}
TargetState jobTgtState = workflowCfg.getTargetState();
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index a3c3435..4c96e83 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -47,7 +47,7 @@ public class JobRebalancer extends TaskRebalancer {
jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
- LOG.debug("Computer Best Partition for job: " + jobName);
+ LOG.debug("Computer Best Partition for job: {}.", jobName);
if (_jobDispatcher == null) {
_jobDispatcher = new JobDispatcher();
}
@@ -56,8 +56,8 @@ public class JobRebalancer extends TaskRebalancer {
_jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
ResourceAssignment resourceAssignment = _jobDispatcher.processJobStatusUpdateAndAssignment(
jobName, currStateOutput, clusterData.getWorkflowContext(jobConfig.getWorkflow()));
- LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s",
- System.currentTimeMillis() - startTime, jobName));
+ LOG.debug("JobRebalancer computation takes {} ms for Job {}",
+ System.currentTimeMillis() - startTime, jobName);
return resourceAssignment;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index 5c3379a..24a17da 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -149,7 +149,8 @@ public class RuntimeJobDag extends JobDag {
if (!_inflightJobList.remove(job)) {
// this job is not in in-flight list
LOG.warn(
- String.format("Job: %s has either finished already, never been scheduled, or been removed from DAG", job));
+ "Job: {} has either finished already, never been scheduled, or been removed from DAG",
+ job);
}
// Add finished job's successors to ready-list
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
index 1daa386..de2d6f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -111,7 +111,7 @@ public class ScheduleConfig {
if (_recurUnit != null) {
long converted = _recurUnit.toMillis(_recurInterval);
if (converted < MIN_RECURRENCE_MILLIS) {
- LOG.error("Recurrence must be at least " + MIN_RECURRENCE_MILLIS + " ms");
+ LOG.error("Recurrence must be at least {} ms", MIN_RECURRENCE_MILLIS);
return false;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c8ed25d..0e1b9b1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -140,7 +140,7 @@ public class TaskDriver {
* @param flow
*/
public void start(Workflow flow) {
- LOG.info("Starting workflow " + flow.getName());
+ LOG.info("Starting workflow {}", flow.getName());
flow.validate();
validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1);
@@ -222,7 +222,7 @@ public class TaskDriver {
// Should not let user changing DAG in the workflow
newWorkflowConfig.setJobDag(currentConfig.getJobDag());
if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
- LOG.error("Failed to update workflow configuration for workflow " + workflow);
+ LOG.error("Failed to update workflow configuration for workflow {}", workflow);
}
}
@@ -351,14 +351,13 @@ public class TaskDriver {
if (workflowState.equals(TaskState.COMPLETED.name())
|| workflowState.equals(TaskState.FAILED.name())
|| workflowState.equals(TaskState.ABORTED.name())) {
- LOG.warn(
- "Queue " + queue + " has already reached its final state, skip deleting job from it.");
+ LOG.warn("Queue {} has already reached its final state, skip deleting job from it.", queue);
return;
}
if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue,
Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), true)) {
- LOG.error("Failed to delete job " + job + " from queue " + queue);
+ LOG.error("Failed to delete job {} from queue {}.", job, queue);
throw new HelixException("Failed to delete job " + job + " from queue " + queue);
}
}
@@ -401,12 +400,12 @@ public class TaskDriver {
Set<String> expiredJobs =
TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
- LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+ LOG.warn("Failed to clean up expired and completed jobs from queue {}", queue);
}
}
workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig.getJobDag().size() >= capacity) {
- throw new HelixException("Failed to enqueue a job, queue is full.");
+ throw new HelixException(String.format( "Failed to enqueue job, queue %s is full.", queue));
}
}
@@ -726,7 +725,7 @@ public class TaskDriver {
if (currentData != null) {
Map<String, Map<String, String>> taskMap = currentData.getMapFields();
if (taskMap == null) {
- LOG.warn("Could not update the jobConfig: " + jobName + " Znode MapField is null.");
+ LOG.warn("Could not update the jobConfig: {} Znode MapField is null.", jobName );
return null;
}
Map<String, Map<String, String>> newTaskMap = new HashMap<String, Map<String, String>>();
@@ -804,7 +803,9 @@ public class TaskDriver {
}
if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
- throw new HelixException("Command must exist in either job or task, not both!");
+ throw new HelixException(String
+ .format("Command must exist in either jobconfig (%s) or taskconfig (%s), not both!", jobName,
+ taskConfig.getId()));
}
}
@@ -1065,7 +1066,7 @@ public class TaskDriver {
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (workflowConfig == null) {
- LOG.warn("WorkflowConfig for " + workflow + " not found!");
+ LOG.warn("WorkflowConfig for {} not found!", workflow);
return;
}
@@ -1073,8 +1074,8 @@ public class TaskDriver {
if (state != TargetState.DELETE && workflowContext != null
&& workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
// Should not update target state for completed workflow
- LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
- + state);
+ LOG.info("Workflow {} is already completed, skip to update its target state {}", workflow,
+ state);
return;
}
@@ -1083,7 +1084,9 @@ public class TaskDriver {
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
state.name());
} else {
- LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null.");
+ LOG.warn(
+ "TargetState DataUpdater: Fails to update target state for {}. CurrentData is null.",
+ workflow);
}
return currentData;
};
@@ -1173,7 +1176,7 @@ public class TaskDriver {
if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
throw new HelixException(String.format(
- "Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
+ "Workflow %s context is empty or not in states: %s, current state: %s.",
workflowName, Arrays.asList(targetStates),
ctx == null ? "null" : ctx.getWorkflowState().toString()));
}
@@ -1212,7 +1215,7 @@ public class TaskDriver {
WorkflowConfig workflowConfig = getWorkflowConfig(workflowName);
if (workflowConfig == null) {
- throw new HelixException(String.format("Workflow \"%s\" does not exists!", workflowName));
+ throw new HelixException(String.format("Workflow %s does not exists!", workflowName));
}
long timeToSleep = timeout > 50L ? 50L : timeout;
@@ -1244,7 +1247,7 @@ public class TaskDriver {
JobConfig jobConfig = getJobConfig(jobName);
JobContext jbCtx = getJobContext(jobName);
throw new HelixException(String.format(
- "Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+ "Workflow %s context is null or job %s is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
workflowName, jobName, allowedStates, ctx == null ? "null" : ctx,
ctx != null ? ctx.getJobState(jobName) : "null", wfcfg, jobConfig, jbCtx));
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 705f118..3771a57 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -89,7 +89,7 @@ public class TaskStateModel extends StateModel {
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
- LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+ LOG.info("Task {} completed with result {}.", msg.getPartitionName(), r);
timeout_task.cancel(false);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index dafd386..e2de939 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -678,19 +678,16 @@ public class TaskUtil {
}
if (!removeWorkflowConfig(accessor, workflow)) {
- LOG.warn(
- String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+ LOG.warn("Error occurred while trying to remove workflow config for {}.", workflow);
return false;
}
if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
- LOG.warn(String.format(
- "Error occurred while trying to remove workflow idealstate/externalview for %s.",
- workflow));
+ LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
+ workflow);
return false;
}
if (!removeWorkflowContext(propertyStore, workflow)) {
- LOG.warn(String.format("Error occurred while trying to remove workflow context for %s.",
- workflow));
+ LOG.warn("Error occurred while trying to remove workflow context for {}.", workflow);
return false;
}
return true;
@@ -712,13 +709,13 @@ public class TaskUtil {
final Set<String> jobs, boolean maintainDependency) {
boolean success = true;
if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
- LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
- + workflow);
+ LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}.", jobs,
+ workflow);
success = false;
}
if (!removeJobsState(propertyStore, workflow, jobs)) {
- LOG.warn("Error occurred while trying to remove jobs states from workflow + " + workflow
- + " jobs " + jobs);
+ LOG.warn("Error occurred while trying to remove jobs states from workflow {} jobs {}.",
+ workflow, jobs);
success = false;
}
for (String job : jobs) {
@@ -865,12 +862,12 @@ public class TaskUtil {
return false;
}
if (!cleanupJobIdealStateExtView(accessor, job)) {
- LOG.warn(String.format(
- "Error occurred while trying to remove job idealstate/externalview for %s.", job));
+ LOG.warn(
+ "Error occurred while trying to remove job idealstate/externalview for {}.", job);
return false;
}
if (!removeJobContext(propertyStore, job)) {
- LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+ LOG.warn("Error occurred while trying to remove job context for {}.", job);
return false;
}
return true;
@@ -888,7 +885,7 @@ public class TaskUtil {
JobDag jobDag = JobDag.fromJson(
currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
if (jobDag == null) {
- LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+ LOG.warn("Could not update DAG for workflow: {} JobDag is null.", workflow);
return null;
}
for (String job : jobsToRemove) {
@@ -907,7 +904,7 @@ public class TaskUtil {
String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
- LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+ LOG.warn("Failed to remove jobs {} from DAG of workflow {}", jobsToRemove, workflow);
return false;
}
@@ -940,7 +937,7 @@ public class TaskUtil {
}
};
if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
- LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+ LOG.warn("Fail to remove job state for jobs {} from workflow {}", jobs, workflow);
return false;
}
return true;
@@ -951,9 +948,9 @@ public class TaskUtil {
String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
- LOG.warn(String.format(
- "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
- workflowJobResource, path));
+ LOG.warn(
+ "Error occurred while trying to remove workflow/jobcontext for {}. Failed to remove node {}.",
+ workflowJobResource, path);
return false;
}
}
@@ -971,9 +968,8 @@ public class TaskUtil {
PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
if (accessor.getPropertyStat(cfgKey) != null) {
if (!accessor.removeProperty(cfgKey)) {
- LOG.warn(String.format(
- "Error occurred while trying to remove config for %s. Failed to remove node %s.",
- workflowJobResource, cfgKey));
+ LOG.warn("Error occurred while trying to remove config for {}. Failed to remove node {}.",
+ workflowJobResource, cfgKey);
return false;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 3396320..6e897c1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -142,7 +142,7 @@ public class WorkflowContext extends HelixProperty {
long ret = Long.valueOf(t);
return ret;
} catch (NumberFormatException e) {
- LOG.warn("Number error " + t + " for job start time of " + job);
+ LOG.warn("Number error {} for job start time of {}.", t, job, e);
return -1;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 5dcf3b2..09eb51b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -41,7 +41,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
final String workflow = resource.getResourceName();
long startTime = System.currentTimeMillis();
- LOG.debug("Computer Best Partition for workflow: " + workflow);
+ LOG.debug("Compute Best Partition for workflow: {}", workflow);
_workflowDispatcher.init(_manager);
WorkflowContext workflowCtx = _workflowDispatcher
.getOrInitializeWorkflowContext(workflow, clusterData.getTaskDataCache());
@@ -54,8 +54,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
_workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, currStateOutput,
new BestPossibleStateOutput());
- LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for workflow %s",
- System.currentTimeMillis() - startTime, workflow));
+ LOG.debug("WorkflowRebalancer computation takes {} ms for workflow {}",
+ System.currentTimeMillis() - startTime, workflow);
return buildEmptyAssignment(workflow, currStateOutput);
}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 9f1abbb..3fe6523 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -143,7 +143,7 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
_driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
Assert.fail("Queue is not full.");
} catch (HelixException e) {
- Assert.assertTrue(e.getMessage().contains("queue is full"));
+ Assert.assertTrue(e.getMessage().contains("queue " + queue + " is full"));
}
for (int i = 0; i < capacity; i++) {