You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/10/28 08:22:38 UTC
[flink] 08/08: [FLINK-13904][checkpointing] Encapsule and optimize
the time relevant operation of CheckpointCoordinator
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit beb3fb06bdca64c4732318667ab59ce298da3b97
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Fri Oct 11 16:40:27 2019 +0800
[FLINK-13904][checkpointing] Encapsule and optimize the time relevant operation of CheckpointCoordinator
---
.../runtime/checkpoint/CheckpointCoordinator.java | 61 ++++++++++++++++++----
1 file changed, 50 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7517a38..6d34d8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -130,9 +132,9 @@ public class CheckpointCoordinator {
/** The max time (in ms) that a checkpoint may take. */
private final long checkpointTimeout;
- /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
+ /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
- private final long minPauseBetweenCheckpointsNanos;
+ private final long minPauseBetweenCheckpoints;
/** The maximum number of checkpoints that may be in progress at the same time. */
private final int maxConcurrentCheckpointAttempts;
@@ -153,8 +155,9 @@ public class CheckpointCoordinator {
/** A handle to the current periodic trigger, to cancel it when necessary. */
private ScheduledFuture<?> currentPeriodicTrigger;
- /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
- private long lastCheckpointCompletionNanos;
+ /** The timestamp (via {@link Clock#relativeTimeMillis()}) when the last checkpoint
+ * completed. */
+ private long lastCheckpointCompletionRelativeTime;
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
@@ -181,9 +184,42 @@ public class CheckpointCoordinator {
private final CheckpointFailureManager failureManager;
+ private final Clock clock;
+
// --------------------------------------------------------------------------------------------
public CheckpointCoordinator(
+ JobID job,
+ CheckpointCoordinatorConfiguration chkConfig,
+ ExecutionVertex[] tasksToTrigger,
+ ExecutionVertex[] tasksToWaitFor,
+ ExecutionVertex[] tasksToCommitTo,
+ CheckpointIDCounter checkpointIDCounter,
+ CompletedCheckpointStore completedCheckpointStore,
+ StateBackend checkpointStateBackend,
+ Executor executor,
+ ScheduledExecutor timer,
+ SharedStateRegistryFactory sharedStateRegistryFactory,
+ CheckpointFailureManager failureManager) {
+
+ this(
+ job,
+ chkConfig,
+ tasksToTrigger,
+ tasksToWaitFor,
+ tasksToCommitTo,
+ checkpointIDCounter,
+ completedCheckpointStore,
+ checkpointStateBackend,
+ executor,
+ timer,
+ sharedStateRegistryFactory,
+ failureManager,
+ SystemClock.getInstance());
+ }
+
+ @VisibleForTesting
+ public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
@@ -195,7 +231,8 @@ public class CheckpointCoordinator {
Executor executor,
ScheduledExecutor timer,
SharedStateRegistryFactory sharedStateRegistryFactory,
- CheckpointFailureManager failureManager) {
+ CheckpointFailureManager failureManager,
+ Clock clock) {
// sanity checks
checkNotNull(checkpointStateBackend);
@@ -216,7 +253,7 @@ public class CheckpointCoordinator {
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
- this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -229,6 +266,7 @@ public class CheckpointCoordinator {
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
+ this.clock = checkNotNull(clock);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
@@ -890,7 +928,7 @@ public class CheckpointCoordinator {
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
- lastCheckpointCompletionNanos = System.nanoTime();
+ lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
@@ -1253,8 +1291,10 @@ public class CheckpointCoordinator {
* @throws CheckpointException If the minimum interval between checkpoints has not passed.
*/
private void checkMinPauseBetweenCheckpoints() throws CheckpointException {
- final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
- final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+ final long nextCheckpointTriggerRelativeTime =
+ lastCheckpointCompletionRelativeTime + minPauseBetweenCheckpoints;
+ final long durationTillNextMillis =
+ nextCheckpointTriggerRelativeTime - clock.relativeTimeMillis();
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
@@ -1269,8 +1309,7 @@ public class CheckpointCoordinator {
}
private long getRandomInitDelay() {
- return ThreadLocalRandom.current().nextLong(
- minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
+ return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L);
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {