You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 16:30:14 UTC

flink git commit: [FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.

Repository: flink
Updated Branches:
  refs/heads/release-1.4 602b0cabc -> e3433d930


[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages  This closes #5872.

This closes #5873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3433d93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3433d93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3433d93

Branch: refs/heads/release-1.4
Commit: e3433d9300ad4ae35693ff76c2de987a37724c2f
Parents: 602b0ca
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:09:40 2018 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 57 +++++++++++---------
 1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3433d93/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9a4456e..c65acb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -327,7 +327,7 @@ public class CheckpointCoordinator {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
-				LOG.info("Stopping checkpoint coordinator for job " + job);
+				LOG.info("Stopping checkpoint coordinator for job {}.", job);
 
 				periodicScheduling = false;
 				triggerRequestQueued = false;
@@ -481,7 +481,7 @@ public class CheckpointCoordinator {
 			if (!props.forceCheckpoint()) {
 				// sanity check: there should never be more than one trigger request queued
 				if (triggerRequestQueued) {
-					LOG.warn("Trying to trigger another checkpoint while one was queued already");
+					LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 				}
 
@@ -522,8 +522,9 @@ public class CheckpointCoordinator {
 			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 				executions[i] = ee;
 			} else {
-				LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
-						tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -537,8 +538,9 @@ public class CheckpointCoordinator {
 			if (ee != null) {
 				ackTasks.put(ee.getAttemptId(), ev);
 			} else {
-				LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
-						ev.getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						ev.getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -558,7 +560,10 @@ public class CheckpointCoordinator {
 			}
 			catch (Throwable t) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+						job,
+						numUnsuccessful,
+						t);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
@@ -588,7 +593,7 @@ public class CheckpointCoordinator {
 						// only do the work if the checkpoint is not discarded anyways
 						// note that checkpoint completion discards the pending checkpoint object
 						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+							LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
 
 							checkpoint.abortExpired();
 							pendingCheckpoints.remove(checkpointID);
@@ -610,7 +615,7 @@ public class CheckpointCoordinator {
 					}
 					else if (!props.forceCheckpoint()) {
 						if (triggerRequestQueued) {
-							LOG.warn("Trying to trigger another checkpoint while one was queued already");
+							LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 						}
 
@@ -642,7 +647,7 @@ public class CheckpointCoordinator {
 						}
 					}
 
-					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
 
@@ -686,8 +691,8 @@ public class CheckpointCoordinator {
 				}
 
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
-						checkpointID, numUnsuccessful, t);
+				LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+						checkpointID, job, numUnsuccessful, t);
 
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -727,7 +732,7 @@ public class CheckpointCoordinator {
 			checkpoint = pendingCheckpoints.remove(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+				LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
 				discardCheckpoint(checkpoint, message.getReason());
 			}
 			else if (checkpoint != null) {
@@ -738,12 +743,12 @@ public class CheckpointCoordinator {
 			else if (LOG.isDebugEnabled()) {
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
-					LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				} else {
 					// message is for an unknown checkpoint. might be so old that we don't even remember it any more
-					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				}
 			}
 		}
@@ -892,7 +897,7 @@ public class CheckpointCoordinator {
 							try {
 								completedCheckpoint.discardOnFailedStoring();
 							} catch (Throwable t) {
-								LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+								LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
 							}
 						}
 					});
@@ -915,7 +920,7 @@ public class CheckpointCoordinator {
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
 
-		LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+		LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
 			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
 		if (LOG.isDebugEnabled()) {
@@ -1065,7 +1070,7 @@ public class CheckpointCoordinator {
 				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 			}
 
-			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+			LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
 
 			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1078,7 +1083,7 @@ public class CheckpointCoordinator {
 				}
 			}
 
-			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+			LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
 
 			// re-assign the task states
 			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1134,8 +1139,8 @@ public class CheckpointCoordinator {
 		
 		Preconditions.checkNotNull(savepointPath, "The savepoint path cannot be null.");
 		
-		LOG.info("Starting job from savepoint {} ({})", 
-				savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
+		LOG.info("Starting job {} from savepoint {} ({})", 
+				job, savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
 
 		// Load the savepoint as a checkpoint into the system
 		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
@@ -1147,7 +1152,7 @@ public class CheckpointCoordinator {
 		long nextCheckpointId = savepoint.getCheckpointID() + 1;
 		checkpointIdCounter.setCount(nextCheckpointId);
 		
-		LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
 
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
@@ -1266,7 +1271,7 @@ public class CheckpointCoordinator {
 				triggerCheckpoint(System.currentTimeMillis(), true);
 			}
 			catch (Exception e) {
-				LOG.error("Exception while triggering checkpoint.", e);
+				LOG.error("Exception while triggering checkpoint for job {}.", job, e);
 			}
 		}
 	}
@@ -1285,7 +1290,7 @@ public class CheckpointCoordinator {
 
 		final String reason = (cause != null) ? cause.getMessage() : "";
 
-		LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+		LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
 
 		pendingCheckpoint.abortDeclined();
 		rememberRecentCheckpointId(checkpointId);