You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/10/28 08:22:38 UTC

[flink] 08/08: [FLINK-13904][checkpointing] Encapsule and optimize the time relevant operation of CheckpointCoordinator

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit beb3fb06bdca64c4732318667ab59ce298da3b97
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Fri Oct 11 16:40:27 2019 +0800

    [FLINK-13904][checkpointing] Encapsule and optimize the time relevant operation of CheckpointCoordinator
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 61 ++++++++++++++++++----
 1 file changed, 50 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7517a38..6d34d8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -130,9 +132,9 @@ public class CheckpointCoordinator {
 	/** The max time (in ms) that a checkpoint may take. */
 	private final long checkpointTimeout;
 
-	/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
+	/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
 	 * enforce minimum processing time between checkpoint attempts */
-	private final long minPauseBetweenCheckpointsNanos;
+	private final long minPauseBetweenCheckpoints;
 
 	/** The maximum number of checkpoints that may be in progress at the same time. */
 	private final int maxConcurrentCheckpointAttempts;
@@ -153,8 +155,9 @@ public class CheckpointCoordinator {
 	/** A handle to the current periodic trigger, to cancel it when necessary. */
 	private ScheduledFuture<?> currentPeriodicTrigger;
 
-	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
-	private long lastCheckpointCompletionNanos;
+	/** The timestamp (via {@link Clock#relativeTimeMillis()}) when the last checkpoint
+	 * completed. */
+	private long lastCheckpointCompletionRelativeTime;
 
 	/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
 	 * Non-volatile, because only accessed in synchronized scope */
@@ -181,9 +184,42 @@ public class CheckpointCoordinator {
 
 	private final CheckpointFailureManager failureManager;
 
+	private final Clock clock;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
+		JobID job,
+		CheckpointCoordinatorConfiguration chkConfig,
+		ExecutionVertex[] tasksToTrigger,
+		ExecutionVertex[] tasksToWaitFor,
+		ExecutionVertex[] tasksToCommitTo,
+		CheckpointIDCounter checkpointIDCounter,
+		CompletedCheckpointStore completedCheckpointStore,
+		StateBackend checkpointStateBackend,
+		Executor executor,
+		ScheduledExecutor timer,
+		SharedStateRegistryFactory sharedStateRegistryFactory,
+		CheckpointFailureManager failureManager) {
+
+		this(
+			job,
+			chkConfig,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			checkpointIDCounter,
+			completedCheckpointStore,
+			checkpointStateBackend,
+			executor,
+			timer,
+			sharedStateRegistryFactory,
+			failureManager,
+			SystemClock.getInstance());
+	}
+
+	@VisibleForTesting
+	public CheckpointCoordinator(
 			JobID job,
 			CheckpointCoordinatorConfiguration chkConfig,
 			ExecutionVertex[] tasksToTrigger,
@@ -195,7 +231,8 @@ public class CheckpointCoordinator {
 			Executor executor,
 			ScheduledExecutor timer,
 			SharedStateRegistryFactory sharedStateRegistryFactory,
-			CheckpointFailureManager failureManager) {
+			CheckpointFailureManager failureManager,
+			Clock clock) {
 
 		// sanity checks
 		checkNotNull(checkpointStateBackend);
@@ -216,7 +253,7 @@ public class CheckpointCoordinator {
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
 		this.checkpointTimeout = chkConfig.getCheckpointTimeout();
-		this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -229,6 +266,7 @@ public class CheckpointCoordinator {
 		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
 		this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
 		this.failureManager = checkNotNull(failureManager);
+		this.clock = checkNotNull(clock);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -890,7 +928,7 @@ public class CheckpointCoordinator {
 
 		// record the time when this was completed, to calculate
 		// the 'min delay between checkpoints'
-		lastCheckpointCompletionNanos = System.nanoTime();
+		lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
 
 		LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
 			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
@@ -1253,8 +1291,10 @@ public class CheckpointCoordinator {
 	 * @throws CheckpointException If the minimum interval between checkpoints has not passed.
 	 */
 	private void checkMinPauseBetweenCheckpoints() throws CheckpointException {
-		final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-		final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+		final long nextCheckpointTriggerRelativeTime =
+			lastCheckpointCompletionRelativeTime + minPauseBetweenCheckpoints;
+		final long durationTillNextMillis =
+			nextCheckpointTriggerRelativeTime - clock.relativeTimeMillis();
 
 		if (durationTillNextMillis > 0) {
 			if (currentPeriodicTrigger != null) {
@@ -1269,8 +1309,7 @@ public class CheckpointCoordinator {
 	}
 
 	private long getRandomInitDelay() {
-		return ThreadLocalRandom.current().nextLong(
-			minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
+		return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L);
 	}
 
 	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {