You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/12 13:49:04 UTC

[flink] 01/05: [FLINK-18137] Handle discarding of triggering checkpoint correctly

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90a7dab894b12b85f21d4cfc637d0d770841cfe5
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 16:17:51 2020 +0200

    [FLINK-18137] Handle discarding of triggering checkpoint correctly
    
    Before discarding a triggering checkpoint could cause a NPE which would stop the
    processing of subsequent checkpoint requests. This commit changes this behaviour
    by checking this condition and instantiating a proper exception in case that a
    triggering checkpoint is being discarded.
    
    This closes #12611.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 52 ++++++++++++--------
 .../CheckpointCoordinatorTriggeringTest.java       | 57 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..6c033d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -544,27 +544,39 @@ public class CheckpointCoordinator {
 						final PendingCheckpoint checkpoint =
 							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
 
-						if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
-							// no exception, no discarding, everything is OK
-							final long checkpointId = checkpoint.getCheckpointId();
-							snapshotTaskState(
-								timestamp,
-								checkpointId,
-								checkpoint.getCheckpointStorageLocation(),
-								request.props,
-								executions,
-								request.advanceToEndOfTime);
-
-							coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-							onTriggerSuccess();
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
+							} else {
+								onTriggerFailure(checkpoint, throwable);
+							}
 						} else {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
 						}
 					},
 					timer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 140441d..3dca350 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -45,14 +48,19 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -529,6 +537,48 @@ checkpointCoordinator.startCheckpointScheduler();
 		assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
 	}
 
+	/**
+	 * This test only fails eventually.
+	 */
+	@Test
+	public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+		final ExecutionVertex executionVertex = mockExecutionVertex(new ExecutionAttemptID());
+
+		final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+		final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+			.setTasks(new ExecutionVertex[]{executionVertex})
+			.setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService))
+			.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+				.build())
+			.build();
+
+		final CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<>();
+		final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+		checkpointCoordinator.addMasterHook(new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+		try {
+			checkpointCoordinator.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+			triggerCheckpointLatch.await();
+			masterHookCheckpointFuture.complete("Completed");
+
+			// discard triggering checkpoint
+			checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+			try {
+				// verify that the second checkpoint request will be executed and eventually times out
+				secondCheckpoint.get();
+				fail("Expected the second checkpoint to fail.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(CheckpointException.class));
+			}
+		} finally {
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
+			ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService);
+		}
+	}
+
 	private CheckpointCoordinator createCheckpointCoordinator() {
 		return new CheckpointCoordinatorBuilder()
 			.setTimer(manuallyTriggeredScheduledExecutor)
@@ -568,9 +618,15 @@ checkpointCoordinator.startCheckpointScheduler();
 			new CheckpointCoordinatorTestingUtils.StringSerializer();
 
 		private final CompletableFuture<String> checkpointFuture;
+		private final OneShotLatch triggerCheckpointLatch;
 
 		private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
+			this(checkpointFuture, new OneShotLatch());
+		}
+
+		private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
 			this.checkpointFuture = checkpointFuture;
+			this.triggerCheckpointLatch = triggerCheckpointLatch;
 		}
 
 		@Override
@@ -582,6 +638,7 @@ checkpointCoordinator.startCheckpointScheduler();
 		@Override
 		public CompletableFuture<String> triggerCheckpoint(
 			long checkpointId, long timestamp, Executor executor) {
+			triggerCheckpointLatch.trigger();
 			return checkpointFuture;
 		}