You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:27 UTC

[7/9] flink git commit: [FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.

[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages  This closes #5872.

This closes #5873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/727370aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/727370aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/727370aa

Branch: refs/heads/master
Commit: 727370aacf63cefc6aed7c46dc2d63517e4b708d
Parents: b0ee05e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 57 +++++++++++---------
 1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/727370aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 59916fd..4ddac003 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -316,7 +316,7 @@ public class CheckpointCoordinator {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
-				LOG.info("Stopping checkpoint coordinator for job " + job);
+				LOG.info("Stopping checkpoint coordinator for job {}.", job);
 
 				periodicScheduling = false;
 				triggerRequestQueued = false;
@@ -414,7 +414,7 @@ public class CheckpointCoordinator {
 			if (!props.forceCheckpoint()) {
 				// sanity check: there should never be more than one trigger request queued
 				if (triggerRequestQueued) {
-					LOG.warn("Trying to trigger another checkpoint while one was queued already");
+					LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 				}
 
@@ -455,8 +455,9 @@ public class CheckpointCoordinator {
 			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 				executions[i] = ee;
 			} else {
-				LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
-						tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -470,8 +471,9 @@ public class CheckpointCoordinator {
 			if (ee != null) {
 				ackTasks.put(ee.getAttemptId(), ev);
 			} else {
-				LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
-						ev.getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						ev.getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -498,7 +500,10 @@ public class CheckpointCoordinator {
 			}
 			catch (Throwable t) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+						job,
+						numUnsuccessful,
+						t);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
@@ -526,7 +531,7 @@ public class CheckpointCoordinator {
 					// only do the work if the checkpoint is not discarded anyways
 					// note that checkpoint completion discards the pending checkpoint object
 					if (!checkpoint.isDiscarded()) {
-						LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+						LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
 
 						checkpoint.abortExpired();
 						pendingCheckpoints.remove(checkpointID);
@@ -547,7 +552,7 @@ public class CheckpointCoordinator {
 					}
 					else if (!props.forceCheckpoint()) {
 						if (triggerRequestQueued) {
-							LOG.warn("Trying to trigger another checkpoint while one was queued already");
+							LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 						}
 
@@ -579,7 +584,7 @@ public class CheckpointCoordinator {
 						}
 					}
 
-					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
 
@@ -620,8 +625,8 @@ public class CheckpointCoordinator {
 				}
 
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
-						checkpointID, numUnsuccessful, t);
+				LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+						checkpointID, job, numUnsuccessful, t);
 
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -673,7 +678,7 @@ public class CheckpointCoordinator {
 			checkpoint = pendingCheckpoints.remove(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+				LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
 				discardCheckpoint(checkpoint, message.getReason());
 			}
 			else if (checkpoint != null) {
@@ -684,12 +689,12 @@ public class CheckpointCoordinator {
 			else if (LOG.isDebugEnabled()) {
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
-					LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				} else {
 					// message is for an unknown checkpoint. might be so old that we don't even remember it any more
-					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				}
 			}
 		}
@@ -834,7 +839,7 @@ public class CheckpointCoordinator {
 							try {
 								completedCheckpoint.discardOnFailedStoring();
 							} catch (Throwable t) {
-								LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+								LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
 							}
 						}
 					});
@@ -857,7 +862,7 @@ public class CheckpointCoordinator {
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
 
-		LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+		LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
 			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
 		if (LOG.isDebugEnabled()) {
@@ -1007,7 +1012,7 @@ public class CheckpointCoordinator {
 				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 			}
 
-			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+			LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
 
 			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1020,7 +1025,7 @@ public class CheckpointCoordinator {
 				}
 			}
 
-			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+			LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
 
 			// re-assign the task states
 			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1076,8 +1081,8 @@ public class CheckpointCoordinator {
 
 		Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
 
-		LOG.info("Starting job from savepoint {} ({})",
-				savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+		LOG.info("Starting job {} from savepoint {} ({})",
+				job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
 
 		final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
 
@@ -1091,7 +1096,7 @@ public class CheckpointCoordinator {
 		long nextCheckpointId = savepoint.getCheckpointID() + 1;
 		checkpointIdCounter.setCount(nextCheckpointId);
 
-		LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
 
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
@@ -1214,7 +1219,7 @@ public class CheckpointCoordinator {
 				triggerCheckpoint(System.currentTimeMillis(), true);
 			}
 			catch (Exception e) {
-				LOG.error("Exception while triggering checkpoint.", e);
+				LOG.error("Exception while triggering checkpoint for job {}.", job, e);
 			}
 		}
 	}
@@ -1233,7 +1238,7 @@ public class CheckpointCoordinator {
 
 		final String reason = (cause != null) ? cause.getMessage() : "";
 
-		LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+		LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
 
 		pendingCheckpoint.abortDeclined();
 		rememberRecentCheckpointId(checkpointId);