You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/18 08:02:21 UTC
[flink] branch release-1.9 updated: [FLINK-13256] Ensure periodical
checkpointing continues when regional failover aborts pending checkpoints
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new b7bfafc [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints
b7bfafc is described below
commit b7bfafca14cd6995a6a59d68d14b4bc3cf91cfb5
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Jul 18 09:58:21 2019 +0200
[FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints
This closes #9128.
(cherry picked from commit 1ec34249a0303ae64d049d177057ef9b6c413ab5)
---
.../runtime/checkpoint/CheckpointCoordinator.java | 139 ++++++++-------
.../executiongraph/failover/FailoverRegion.java | 2 +-
.../FailoverStrategyCheckpointCoordinatorTest.java | 186 +++++++++++++++++++++
3 files changed, 264 insertions(+), 63 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3dc5c1d..682685c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -478,32 +478,9 @@ public class CheckpointCoordinator {
throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
- // if too many checkpoints are currently in progress, we need to mark that a request is queued
- if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
- triggerRequestQueued = true;
- if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel(false);
- currentPeriodicTrigger = null;
- }
- throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
- }
+ checkConcurrentCheckpoints();
- // make sure the minimum interval between checkpoints has passed
- final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
- final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
-
- if (durationTillNextMillis > 0) {
- if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel(false);
- currentPeriodicTrigger = null;
- }
- // Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(),
- durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
-
- throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
- }
+ checkMinPauseBetweenCheckpoints();
}
}
@@ -623,32 +600,9 @@ public class CheckpointCoordinator {
throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
- if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
- triggerRequestQueued = true;
- if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel(false);
- currentPeriodicTrigger = null;
- }
- throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
- }
-
- // make sure the minimum interval between checkpoints has passed
- final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
- final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
-
- if (durationTillNextMillis > 0) {
- if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel(false);
- currentPeriodicTrigger = null;
- }
+ checkConcurrentCheckpoints();
- // Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(),
- durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
-
- throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
- }
+ checkMinPauseBetweenCheckpoints();
}
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
@@ -1025,9 +979,7 @@ public class CheckpointCoordinator {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
}
- currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(),
- 0L, baseInterval, TimeUnit.MILLISECONDS);
+ currentPeriodicTrigger = scheduleTriggerWithDelay(0L);
}
else {
timer.execute(new ScheduledTrigger());
@@ -1224,6 +1176,11 @@ public class CheckpointCoordinator {
return checkpointTimeout;
}
+ @VisibleForTesting
+ boolean isCurrentPeriodicTriggerAvailable() {
+ return currentPeriodicTrigger != null;
+ }
+
/**
* Returns whether periodic checkpointing has been configured.
*
@@ -1247,10 +1204,7 @@ public class CheckpointCoordinator {
stopCheckpointScheduler();
periodicScheduling = true;
- long initialDelay = ThreadLocalRandom.current().nextLong(
- minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
- currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
+ currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
}
}
@@ -1284,6 +1238,54 @@ public class CheckpointCoordinator {
}
}
+ /**
+ * If too many checkpoints are currently in progress, we need to mark that a request is queued
+ *
+ * @throws CheckpointException If too many checkpoints are currently in progress.
+ */
+ private void checkConcurrentCheckpoints() throws CheckpointException {
+ if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+ triggerRequestQueued = true;
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel(false);
+ currentPeriodicTrigger = null;
+ }
+ throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+ }
+ }
+
+ /**
+ * Make sure the minimum interval between checkpoints has passed
+ *
+ * @throws CheckpointException If the minimum interval between checkpoints has not passed.
+ */
+ private void checkMinPauseBetweenCheckpoints() throws CheckpointException {
+ final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+ final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+ if (durationTillNextMillis > 0) {
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel(false);
+ currentPeriodicTrigger = null;
+ }
+ // Reassign the new trigger to the currentPeriodicTrigger
+ currentPeriodicTrigger = scheduleTriggerWithDelay(durationTillNextMillis);
+
+ throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+ }
+ }
+
+ private long getRandomInitDelay() {
+ return ThreadLocalRandom.current().nextLong(
+ minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
+ }
+
+ private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
+ return timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ initDelay, baseInterval, TimeUnit.MILLISECONDS);
+ }
+
// ------------------------------------------------------------------------
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------
@@ -1391,6 +1393,13 @@ public class CheckpointCoordinator {
}
private void failPendingCheckpoint(
+ final PendingCheckpoint pendingCheckpoint,
+ final CheckpointFailureReason reason) {
+
+ failPendingCheckpoint(pendingCheckpoint, reason, null);
+ }
+
+ private void failPendingCheckpoint(
final PendingCheckpoint pendingCheckpoint,
final CheckpointFailureReason reason,
final Throwable cause) {
@@ -1398,12 +1407,18 @@ public class CheckpointCoordinator {
CheckpointException exception = new CheckpointException(reason, cause);
pendingCheckpoint.abort(reason, cause);
failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId());
- }
- private void failPendingCheckpoint(
- final PendingCheckpoint pendingCheckpoint,
- final CheckpointFailureReason reason) {
+ if (!shutdown && periodicScheduling && currentPeriodicTrigger == null) {
+ synchronized (lock) {
+ if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) {
+ triggerRequestQueued = false;
+ currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
+ }
+ }
+ }
+ }
- failPendingCheckpoint(pendingCheckpoint, reason, null);
+ private boolean allPendingCheckpointsDiscarded() {
+ return pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 2c0c148..9a93bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -209,7 +209,7 @@ public class FailoverRegion {
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
// if we have checkpointed state, reload it into the executions
if (executionGraph.getCheckpointCoordinator() != null) {
- // we restart the checkpoint scheduler for
+ // we abort pending checkpoints for
// i) enable new checkpoint could be triggered without waiting for last checkpoint expired.
// ii) ensure the EXACTLY_ONCE semantics if needed.
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
new file mode 100644
index 0000000..1d77716
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the interaction between the {@link FailoverStrategy} and the {@link CheckpointCoordinator}.
+ */
+public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
+ private ManuallyTriggeredScheduledExecutor manualThreadExecutor;
+
+ @Before
+ public void setUp() {
+ manualThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+ }
+
+ /**
+ * Tests that {@link CheckpointCoordinator#abortPendingCheckpoints(CheckpointException)}
+ * called by {@link AdaptedRestartPipelinedRegionStrategyNG} or {@link FailoverRegion} could handle
+ * the {@code currentPeriodicTrigger} null situation well.
+ */
+ @Test
+ public void testAbortPendingCheckpointsWithTriggerValidation() {
+ final int maxConcurrentCheckpoints = ThreadLocalRandom.current().nextInt(10) + 1;
+ ExecutionVertex executionVertex = mockExecutionVertex();
+ CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ 0,
+ maxConcurrentCheckpoints,
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(
+ new JobID(),
+ checkpointCoordinatorConfiguration,
+ new ExecutionVertex[] { executionVertex },
+ new ExecutionVertex[] { executionVertex },
+ new ExecutionVertex[] { executionVertex },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY,
+ mock(CheckpointFailureManager.class));
+
+ // switch current execution's state to running to allow checkpoint could be triggered.
+ mockExecutionRunning(executionVertex);
+
+ // use manual checkpoint timer to trigger period checkpoints as we expect.
+ ManualCheckpointTimer manualCheckpointTimer = new ManualCheckpointTimer(manualThreadExecutor);
+ // set the init delay as 0 to ensure first checkpoint could be triggered once we trigger the manual executor
+ // this is used to avoid the randomness of when to trigger the first checkpoint (introduced via FLINK-9352)
+ manualCheckpointTimer.setManualDelay(0L);
+ Whitebox.setInternalState(checkpointCoordinator, "timer", manualCheckpointTimer);
+
+ checkpointCoordinator.startCheckpointScheduler();
+ assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+ manualThreadExecutor.triggerAll();
+ manualThreadExecutor.triggerScheduledTasks();
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+ for (int i = 1; i < maxConcurrentCheckpoints; i++) {
+ checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+ assertEquals(i + 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+ }
+
+ // as we only support limited concurrent checkpoints, after checkpoint triggered more than the limits,
+ // the currentPeriodicTrigger would been assigned as null.
+ checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+ assertFalse(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+ assertEquals(maxConcurrentCheckpoints, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+ checkpointCoordinator.abortPendingCheckpoints(
+ new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+ // after aborting checkpoints, we ensure currentPeriodicTrigger still available.
+ assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ }
+
+ private ExecutionVertex mockExecutionVertex() {
+ ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+ ExecutionVertex executionVertex = mock(ExecutionVertex.class);
+ Execution execution = Mockito.mock(Execution.class);
+ when(execution.getAttemptId()).thenReturn(executionAttemptID);
+ when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
+ return executionVertex;
+ }
+
+ private void mockExecutionRunning(ExecutionVertex executionVertex) {
+ when(executionVertex.getCurrentExecutionAttempt().getState()).thenReturn(ExecutionState.RUNNING);
+ }
+
+ public static class ManualCheckpointTimer extends ScheduledThreadPoolExecutor {
+ private final ScheduledExecutor scheduledExecutor;
+ private long manualDelay = 0;
+
+ ManualCheckpointTimer(final ScheduledExecutor scheduledExecutor) {
+ super(0);
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ void setManualDelay(long manualDelay) {
+ this.manualDelay = manualDelay;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ // used as checkpoint canceller, as we don't want pending checkpoint cancelled, this should never be scheduled.
+ return new NeverCompleteFuture(delay);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ // used to schedule periodic checkpoints.
+ // this would use configured 'manualDelay' to let the task schedule with the wanted delay.
+ return scheduledExecutor.scheduleWithFixedDelay(command, manualDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ scheduledExecutor.execute(command);
+ }
+ }
+}