You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/12 13:49:04 UTC
[flink] 01/05: [FLINK-18137] Handle discarding of triggering
checkpoint correctly
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90a7dab894b12b85f21d4cfc637d0d770841cfe5
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 16:17:51 2020 +0200
[FLINK-18137] Handle discarding of triggering checkpoint correctly
Before discarding a triggering checkpoint could cause a NPE which would stop the
processing of subsequent checkpoint requests. This commit changes this behaviour
by checking this condition and instantiating a proper exception in case that a
triggering checkpoint is being discarded.
This closes #12611.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 52 ++++++++++++--------
.../CheckpointCoordinatorTriggeringTest.java | 57 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..6c033d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -544,27 +544,39 @@ public class CheckpointCoordinator {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
- if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
- // no exception, no discarding, everything is OK
- final long checkpointId = checkpoint.getCheckpointId();
- snapshotTaskState(
- timestamp,
- checkpointId,
- checkpoint.getCheckpointStorageLocation(),
- request.props,
- executions,
- request.advanceToEndOfTime);
-
- coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
- onTriggerSuccess();
+ Preconditions.checkState(
+ checkpoint != null || throwable != null,
+ "Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+ if (throwable != null) {
+ // the initialization might not be finished yet
+ if (checkpoint == null) {
+ onTriggerFailure(request, throwable);
+ } else {
+ onTriggerFailure(checkpoint, throwable);
+ }
} else {
- // the initialization might not be finished yet
- if (checkpoint == null) {
- onTriggerFailure(request, throwable);
- } else {
- onTriggerFailure(checkpoint, throwable);
- }
+ if (checkpoint.isDiscarded()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ // no exception, no discarding, everything is OK
+ final long checkpointId = checkpoint.getCheckpointId();
+ snapshotTaskState(
+ timestamp,
+ checkpointId,
+ checkpoint.getCheckpointStorageLocation(),
+ request.props,
+ executions,
+ request.advanceToEndOfTime);
+
+ coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+ onTriggerSuccess();
+ }
}
},
timer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 140441d..3dca350 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
@@ -45,14 +48,19 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -529,6 +537,48 @@ checkpointCoordinator.startCheckpointScheduler();
assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
}
+ /**
+ * This test only fails eventually.
+ */
+ @Test
+ public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+ final ExecutionVertex executionVertex = mockExecutionVertex(new ExecutionAttemptID());
+
+ final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTasks(new ExecutionVertex[]{executionVertex})
+ .setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService))
+ .setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .build())
+ .build();
+
+ final CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<>();
+ final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+ checkpointCoordinator.addMasterHook(new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+ triggerCheckpointLatch.await();
+ masterHookCheckpointFuture.complete("Completed");
+
+ // discard triggering checkpoint
+ checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ try {
+ // verify that the second checkpoint request will be executed and eventually times out
+ secondCheckpoint.get();
+ fail("Expected the second checkpoint to fail.");
+ } catch (ExecutionException ee) {
+ assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(CheckpointException.class));
+ }
+ } finally {
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
+ ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService);
+ }
+ }
+
private CheckpointCoordinator createCheckpointCoordinator() {
return new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -568,9 +618,15 @@ checkpointCoordinator.startCheckpointScheduler();
new CheckpointCoordinatorTestingUtils.StringSerializer();
private final CompletableFuture<String> checkpointFuture;
+ private final OneShotLatch triggerCheckpointLatch;
private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
+ this(checkpointFuture, new OneShotLatch());
+ }
+
+ private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
this.checkpointFuture = checkpointFuture;
+ this.triggerCheckpointLatch = triggerCheckpointLatch;
}
@Override
@@ -582,6 +638,7 @@ checkpointCoordinator.startCheckpointScheduler();
@Override
public CompletableFuture<String> triggerCheckpoint(
long checkpointId, long timestamp, Executor executor) {
+ triggerCheckpointLatch.trigger();
return checkpointFuture;
}