You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:27 UTC
[7/9] flink git commit: [FLINK-9206][checkpoints] add job IDs to
CheckpointCoordinator log messages This closes #5872.
[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.
This closes #5873.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/727370aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/727370aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/727370aa
Branch: refs/heads/master
Commit: 727370aacf63cefc6aed7c46dc2d63517e4b708d
Parents: b0ee05e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/727370aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 59916fd..4ddac003 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -316,7 +316,7 @@ public class CheckpointCoordinator {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
- LOG.info("Stopping checkpoint coordinator for job " + job);
+ LOG.info("Stopping checkpoint coordinator for job {}.", job);
periodicScheduling = false;
triggerRequestQueued = false;
@@ -414,7 +414,7 @@ public class CheckpointCoordinator {
if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -455,8 +455,9 @@ public class CheckpointCoordinator {
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
- LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
- tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -470,8 +471,9 @@ public class CheckpointCoordinator {
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
- LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
- ev.getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ ev.getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -498,7 +500,10 @@ public class CheckpointCoordinator {
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+ LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+ job,
+ numUnsuccessful,
+ t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
@@ -526,7 +531,7 @@ public class CheckpointCoordinator {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+ LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
checkpoint.abortExpired();
pendingCheckpoints.remove(checkpointID);
@@ -547,7 +552,7 @@ public class CheckpointCoordinator {
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -579,7 +584,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+ LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
pendingCheckpoints.put(checkpointID, checkpoint);
@@ -620,8 +625,8 @@ public class CheckpointCoordinator {
}
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
- checkpointID, numUnsuccessful, t);
+ LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+ checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -673,7 +678,7 @@ public class CheckpointCoordinator {
checkpoint = pendingCheckpoints.remove(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
- LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+ LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
discardCheckpoint(checkpoint, message.getReason());
}
else if (checkpoint != null) {
@@ -684,12 +689,12 @@ public class CheckpointCoordinator {
else if (LOG.isDebugEnabled()) {
if (recentPendingCheckpoints.contains(checkpointId)) {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
- LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
} else {
// message is for an unknown checkpoint. might be so old that we don't even remember it any more
- LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
}
}
}
@@ -834,7 +839,7 @@ public class CheckpointCoordinator {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
- LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+ LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
}
}
});
@@ -857,7 +862,7 @@ public class CheckpointCoordinator {
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
- LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+ LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
@@ -1007,7 +1012,7 @@ public class CheckpointCoordinator {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
- LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+ LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1020,7 +1025,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+ LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1076,8 +1081,8 @@ public class CheckpointCoordinator {
Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
- LOG.info("Starting job from savepoint {} ({})",
- savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+ LOG.info("Starting job {} from savepoint {} ({})",
+ job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
@@ -1091,7 +1096,7 @@ public class CheckpointCoordinator {
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
- LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+ LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
@@ -1214,7 +1219,7 @@ public class CheckpointCoordinator {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
- LOG.error("Exception while triggering checkpoint.", e);
+ LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
@@ -1233,7 +1238,7 @@ public class CheckpointCoordinator {
final String reason = (cause != null) ? cause.getMessage() : "";
- LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+ LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);