You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 16:30:14 UTC
flink git commit: [FLINK-9206][checkpoints] add job IDs to
CheckpointCoordinator log messages This closes #5872.
Repository: flink
Updated Branches:
refs/heads/release-1.4 602b0cabc -> e3433d930
[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.
This closes #5873.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3433d93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3433d93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3433d93
Branch: refs/heads/release-1.4
Commit: e3433d9300ad4ae35693ff76c2de987a37724c2f
Parents: 602b0ca
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:09:40 2018 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e3433d93/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9a4456e..c65acb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -327,7 +327,7 @@ public class CheckpointCoordinator {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
- LOG.info("Stopping checkpoint coordinator for job " + job);
+ LOG.info("Stopping checkpoint coordinator for job {}.", job);
periodicScheduling = false;
triggerRequestQueued = false;
@@ -481,7 +481,7 @@ public class CheckpointCoordinator {
if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -522,8 +522,9 @@ public class CheckpointCoordinator {
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
- LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
- tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -537,8 +538,9 @@ public class CheckpointCoordinator {
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
- LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
- ev.getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ ev.getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -558,7 +560,10 @@ public class CheckpointCoordinator {
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+ LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+ job,
+ numUnsuccessful,
+ t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
@@ -588,7 +593,7 @@ public class CheckpointCoordinator {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+ LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
checkpoint.abortExpired();
pendingCheckpoints.remove(checkpointID);
@@ -610,7 +615,7 @@ public class CheckpointCoordinator {
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -642,7 +647,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+ LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
pendingCheckpoints.put(checkpointID, checkpoint);
@@ -686,8 +691,8 @@ public class CheckpointCoordinator {
}
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
- checkpointID, numUnsuccessful, t);
+ LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+ checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -727,7 +732,7 @@ public class CheckpointCoordinator {
checkpoint = pendingCheckpoints.remove(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
- LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+ LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
discardCheckpoint(checkpoint, message.getReason());
}
else if (checkpoint != null) {
@@ -738,12 +743,12 @@ public class CheckpointCoordinator {
else if (LOG.isDebugEnabled()) {
if (recentPendingCheckpoints.contains(checkpointId)) {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
- LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
} else {
// message is for an unknown checkpoint. might be so old that we don't even remember it any more
- LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
}
}
}
@@ -892,7 +897,7 @@ public class CheckpointCoordinator {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
- LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+ LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
}
}
});
@@ -915,7 +920,7 @@ public class CheckpointCoordinator {
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
- LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+ LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
@@ -1065,7 +1070,7 @@ public class CheckpointCoordinator {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
- LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+ LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1078,7 +1083,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+ LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1134,8 +1139,8 @@ public class CheckpointCoordinator {
Preconditions.checkNotNull(savepointPath, "The savepoint path cannot be null.");
- LOG.info("Starting job from savepoint {} ({})",
- savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
+ LOG.info("Starting job {} from savepoint {} ({})",
+ job, savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
// Load the savepoint as a checkpoint into the system
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
@@ -1147,7 +1152,7 @@ public class CheckpointCoordinator {
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
- LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+ LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
@@ -1266,7 +1271,7 @@ public class CheckpointCoordinator {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
- LOG.error("Exception while triggering checkpoint.", e);
+ LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
@@ -1285,7 +1290,7 @@ public class CheckpointCoordinator {
final String reason = (cause != null) ? cause.getMessage() : "";
- LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+ LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);