You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:58 UTC

[flink] 09/13: [FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52f7b9d070f73ee6acf2a87ba81d3d243d01f845
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 12:43:58 2020 +0200

    [FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.
    
      - This adds verious tests for OperatorCoordinator checkpointing
      - The checkpoint coordinator also restores state to the OperatorCoordinator
---
 .../java/org/apache/flink/util/ExceptionUtils.java |  37 +++++
 .../runtime/checkpoint/CheckpointCoordinator.java  |  17 +++
 .../checkpoint/StateAssignmentOperation.java       |   8 +-
 .../OperatorCoordinatorSchedulerTest.java          | 156 ++++++++++++++++++++-
 .../coordination/TestingOperatorCoordinator.java   |  35 ++++-
 .../runtime/scheduler/SchedulerTestingUtils.java   |  36 +++++
 .../apache/flink/core/testutils/FlinkMatchers.java |  61 ++++++--
 7 files changed, 333 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6512531..8982b29 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -462,6 +462,43 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether a throwable chain contains a specific type of exception and returns it.
+	 * This method handles {@link SerializedThrowable}s in the chain and deserializes them with
+	 * the given ClassLoader.
+	 *
+	 * <p>SerializedThrowables are often used when exceptions might come from dynamically loaded code and
+	 * be transported over RPC / HTTP for better error reporting.
+	 * The receiving processes or threads might not have the dynamically loaded code available.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param searchType the type of exception to search for in the chain.
+	 * @param classLoader the ClassLoader to use when encountering a SerializedThrowable.
+	 * @return Optional throwable of the requested type if available, otherwise empty
+	 */
+	public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
+			Throwable throwable,
+			Class<T> searchType,
+			ClassLoader classLoader) {
+
+		if (throwable == null || searchType == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (searchType.isAssignableFrom(t.getClass())) {
+				return Optional.of(searchType.cast(t));
+			} else if (t instanceof SerializedThrowable) {
+				t = ((SerializedThrowable) t).deserializeError(classLoader);
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Checks whether a throwable chain contains an exception matching a predicate and returns it.
 	 *
 	 * @param throwable the throwable chain to check.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5d2db1d..fbdf2d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -1210,6 +1211,8 @@ public class CheckpointCoordinator {
 					allowNonRestoredState,
 					LOG);
 
+			restoreStateToCoordinators(operatorStates);
+
 			// update metrics
 
 			if (statsTracker != null) {
@@ -1416,6 +1419,20 @@ public class CheckpointCoordinator {
 			initDelay, baseInterval, TimeUnit.MILLISECONDS);
 	}
 
+	private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> operatorStates) throws Exception {
+		for (OperatorCoordinatorCheckpointContext coordContext : coordinatorsToCheckpoint) {
+			final OperatorState state = operatorStates.get(coordContext.operatorId());
+			if (state == null) {
+				continue;
+			}
+
+			final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
+			if (coordinatorState != null) {
+				coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  job status listener that schedules / cancels periodic checkpoints
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 9644430..8ce8e55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -89,7 +89,7 @@ public class StateAssignmentOperation {
 			// find the states of all operators belonging to this task
 			List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
 			List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size());
-			boolean statelessTask = true;
+			boolean statelessSubTasks = true;
 			for (OperatorIDPair operatorIDPair : operatorIDPairs) {
 				OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
 
@@ -99,12 +99,12 @@ public class StateAssignmentOperation {
 						operatorID,
 						executionJobVertex.getParallelism(),
 						executionJobVertex.getMaxParallelism());
-				} else {
-					statelessTask = false;
+				} else if (operatorState.getNumberCollectedStates() > 0) {
+					statelessSubTasks = false;
 				}
 				operatorStates.add(operatorState);
 			}
-			if (!statelessTask) { // skip tasks where no operator has any state
+			if (!statelessSubTasks) { // skip tasks where no operator has any state
 				assignAttemptState(executionJobVertex, operatorStates);
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 0354e3d..bb0e68e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -37,28 +39,36 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matcher;
+import org.junit.After;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -77,6 +87,15 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
 	private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
 
+	private DefaultScheduler createdScheduler;
+
+	@After
+	public void shutdownScheduler() throws Exception{
+		if (createdScheduler != null) {
+			createdScheduler.suspend(new Exception("shutdown"));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  tests for scheduling
 	// ------------------------------------------------------------------------
@@ -184,6 +203,86 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
+	//  tests for checkpointing
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTakeCheckpoint() throws Exception {
+		final byte[] checkpointData = new byte[656];
+		new Random().nextBytes(checkpointData);
+
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+		coordinator.getLastTriggeredCheckpoint().complete(checkpointData);
+		acknowledgeCurrentCheckpoint(scheduler);
+
+		final OperatorState state = checkpointFuture.get().getOperatorStates().get(testOperatorId);
+		assertArrayEquals(checkpointData, getStateHandleContents(state.getCoordinatorState()));
+	}
+
+	@Test
+	public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
+		final OperatorCoordinator.Provider failingCoordinatorProvider =
+			new TestingOperatorCoordinator.Provider(testOperatorId, CoordinatorThatFailsCheckpointing::new);
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks(failingCoordinatorProvider);
+
+		final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+
+		assertThat(checkpointFuture, futureWillCompleteWithTestException());
+	}
+
+	@Test
+	public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+		final CompletableFuture<?> coordinatorStateFuture = coordinator.getLastTriggeredCheckpoint();
+
+		coordinatorStateFuture.completeExceptionally(new TestException());
+
+		assertThat(checkpointFuture, futureWillCompleteWithTestException());
+	}
+
+	@Test
+	public void testSavepointRestoresCoordinator() throws Exception {
+		final byte[] testCoordinatorState = new byte[123];
+		new Random().nextBytes(testCoordinatorState);
+
+		final DefaultScheduler scheduler = createSchedulerWithRestoredSavepoint(testCoordinatorState);
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final byte[] restoredState = coordinator.getLastRestoredCheckpointState();
+		assertArrayEquals(testCoordinatorState, restoredState);
+	}
+
+	@Test
+	public void testGlobalFailureResetsToCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final byte[] coordinatorState = new byte[] {7, 11, 3, 5};
+		takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);
+		failGlobalAndRestart(scheduler, new TestException());
+
+		assertArrayEquals("coordinator should have a restored checkpoint",
+				coordinatorState, coordinator.getLastRestoredCheckpointState());
+	}
+
+	@Test
+	public void testConfirmCheckpointComplete() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final long checkpointId = takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+
+		assertEquals("coordinator should be notified of completed checkpoint",
+				checkpointId, coordinator.getLastCheckpointComplete());
+	}
+
+	// ------------------------------------------------------------------------
 	//  tests for REST request delivery
 	// ------------------------------------------------------------------------
 
@@ -312,7 +411,8 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
 			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
 
-		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
+		final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
+		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
 		vertex.setInvokableClass(NoOpInvokable.class);
 		vertex.addOperatorCoordinator(new SerializedValue<>(provider));
 		vertex.setParallelism(2);
@@ -328,6 +428,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 				: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
 		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
+		this.createdScheduler = scheduler;
 		return scheduler;
 	}
 
@@ -370,6 +471,43 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
 	}
 
+	private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) {
+		scheduler.handleGlobalFailure(reason);
+		SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
+
+		executor.triggerScheduledTasks();   // this handles the restart / redeploy
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard the test assumptions: This must bring the tasks back to RUNNING
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+	}
+
+	private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
+		executor.triggerAll();
+		return future;
+	}
+
+	private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+		executor.triggerAll();
+		SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
+		executor.triggerAll();
+	}
+
+	private long takeCompleteCheckpoint(
+			DefaultScheduler scheduler,
+			TestingOperatorCoordinator testingOperatorCoordinator,
+			byte[] coordinatorState) throws Exception {
+
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+
+		testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);
+		acknowledgeCurrentCheckpoint(scheduler);
+
+		// wait until checkpoint has completed
+		return checkpointFuture.get().getCheckpointID();
+	}
+
 	// ------------------------------------------------------------------------
 	//  miscellaneous utilities
 	// ------------------------------------------------------------------------
@@ -395,6 +533,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return out.toByteArray();
 	}
 
+	private static <T> Matcher<CompletableFuture<T>> futureWillCompleteWithTestException() {
+		return futureWillCompleteExceptionally(
+				(e) -> ExceptionUtils.findThrowableSerializedAware(
+						e, TestException.class, OperatorCoordinatorSchedulerTest.class.getClassLoader()).isPresent(),
+				Duration.ofSeconds(10),
+				"A TestException in the cause chain");
+	}
+
+	private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {
+		if (stateHandle instanceof ByteStreamStateHandle) {
+			return ((ByteStreamStateHandle) stateHandle).getData();
+		}
+		fail("other state handles not implemented");
+		return null;
+	}
+
 	// ------------------------------------------------------------------------
 	//  test mocks
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 9889ba8..5d93bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -25,7 +25,9 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * A simple testing implementation of the {@link OperatorCoordinator}.
@@ -36,11 +38,20 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 
 	private final ArrayList<Integer> failedTasks = new ArrayList<>();
 
+	@Nullable
+	private byte[] lastRestoredCheckpointState;
+
+	private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints;
+
+	private BlockingQueue<Long> lastCheckpointComplete;
+
 	private boolean started;
 	private boolean closed;
 
 	public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
 		this.context = context;
+		this.triggeredCheckpoints = new LinkedBlockingQueue<>();
+		this.lastCheckpointComplete = new LinkedBlockingQueue<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -65,17 +76,22 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 
 	@Override
 	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
-		throw new UnsupportedOperationException();
+		final CompletableFuture<byte[]> coordinatorStateFuture = new CompletableFuture<>();
+
+		boolean added = triggeredCheckpoints.offer(coordinatorStateFuture);
+		assert added; // guard the test assumptions
+
+		return coordinatorStateFuture;
 	}
 
 	@Override
 	public void checkpointComplete(long checkpointId) {
-		throw new UnsupportedOperationException();
+		lastCheckpointComplete.offer(checkpointId);
 	}
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) {
-		throw new UnsupportedOperationException();
+		lastRestoredCheckpointState = checkpointData;
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,6 +112,19 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 		return failedTasks;
 	}
 
+	@Nullable
+	public byte[] getLastRestoredCheckpointState() {
+		return lastRestoredCheckpointState;
+	}
+
+	public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws InterruptedException {
+		return triggeredCheckpoints.take();
+	}
+
+	public long getLastCheckpointComplete() throws InterruptedException {
+		return lastCheckpointComplete.take();
+	}
+
 	// ------------------------------------------------------------------------
 	//  The provider for this coordinator implementation
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 40fe221..124e733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -89,6 +90,7 @@ import java.util.stream.StreamSupport;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 /**
  * A utility class to create {@link DefaultScheduler} instances for testing.
@@ -262,6 +264,13 @@ public class SchedulerTestingUtils {
 		);
 	}
 
+	public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
+		final JobID jid = scheduler.requestJob().getJobID();
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
+		);
+	}
+
 	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
 		final JobID jid = scheduler.requestJob().getJobID();
@@ -272,6 +281,33 @@ public class SchedulerTestingUtils {
 		}
 	}
 
+	public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		return checkpointCoordinator.triggerCheckpoint(false);
+	}
+
+	public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		assertEquals("Coordinator has not ", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		final PendingCheckpoint pc = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+
+		// because of races against the async thread in the coordinator, we need to wait here until the
+		// coordinator state is acknowledged. This can be removed once the CheckpointCoordinator is
+		// executes all actions in the Scheduler's main thread executor.
+		while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
+			try {
+				Thread.sleep(1);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				fail("interrupted");
+			}
+		}
+
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
+	}
+
 	public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
 		checkpointCoordinator.triggerCheckpoint(false);
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
index 0303272..ada7bf4 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
@@ -21,6 +21,8 @@ package org.apache.flink.core.testutils;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 /**
  * Some reusable hamcrest matchers for Flink.
@@ -60,6 +63,18 @@ public class FlinkMatchers {
 	/**
 	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
 	 */
+	public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
+			Function<Throwable, Boolean> exceptionCheck,
+			Duration timeout,
+			String checkDescription) {
+		Objects.requireNonNull(exceptionCheck, "exceptionType should not be null");
+		Objects.requireNonNull(timeout, "timeout should not be null");
+		return new FutureWillFailMatcher<>(exceptionCheck, timeout, checkDescription);
+	}
+
+	/**
+	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
+	 */
 	public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(Duration timeout) {
 		return futureWillCompleteExceptionally(Throwable.class, timeout);
 	}
@@ -118,14 +133,31 @@ public class FlinkMatchers {
 
 	private static final class FutureWillFailMatcher<T> extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
 
-		private final Class<? extends Throwable> expectedException;
+		private final Function<Throwable, Boolean> exceptionValidator;
 
 		private final Duration timeout;
 
-		FutureWillFailMatcher(Class<? extends Throwable> expectedException, Duration timeout) {
+		private final String validationDescription;
+
+		FutureWillFailMatcher(
+			Class<? extends Throwable> expectedException,
+			Duration timeout) {
+
 			super(CompletableFuture.class);
-			this.expectedException = expectedException;
+			this.exceptionValidator = (e) -> expectedException.isAssignableFrom(e.getClass());
 			this.timeout = timeout;
+			this.validationDescription = expectedException.getName();
+		}
+
+		FutureWillFailMatcher(
+				Function<Throwable, Boolean> exceptionValidator,
+				Duration timeout,
+				String validationDescription) {
+
+			super(CompletableFuture.class);
+			this.exceptionValidator = exceptionValidator;
+			this.timeout = timeout;
+			this.validationDescription = validationDescription;
 		}
 
 		@Override
@@ -144,18 +176,29 @@ public class FlinkMatchers {
 				return false;
 			}
 			catch (ExecutionException e) {
-				if (e.getCause() == null || !expectedException.isAssignableFrom(e.getCause().getClass())) {
-					mismatchDescription.appendText("Future completed with different exception: " + e.getCause());
-					return false;
+				final Throwable cause = e.getCause();
+				if (cause != null && exceptionValidator.apply(cause)) {
+					return true;
+				}
+
+				String otherDescription = "(null)";
+				if (cause != null) {
+					final StringWriter stm = new StringWriter();
+					try (PrintWriter wrt = new PrintWriter(stm)) {
+						cause.printStackTrace(wrt);
+					}
+					otherDescription = stm.toString();
 				}
-				return true;
+
+				mismatchDescription.appendText("Future completed with different exception: " + otherDescription);
+				return false;
 			}
 		}
 
 		@Override
 		public void describeTo(Description description) {
-			description.appendText("A CompletableFuture that will failed within " +
-				timeout.toMillis() + " milliseconds with: " + expectedException.getName());
+			description.appendText("A CompletableFuture that will have failed within " +
+				timeout.toMillis() + " milliseconds with: " + validationDescription);
 		}
 	}
 }