You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:58 UTC
[flink] 09/13: [FLINK-16177][checkpointing] Integrate
OperatorCoordinator fully with checkpointing.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52f7b9d070f73ee6acf2a87ba81d3d243d01f845
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 12:43:58 2020 +0200
[FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.
- This adds verious tests for OperatorCoordinator checkpointing
- The checkpoint coordinator also restores state to the OperatorCoordinator
---
.../java/org/apache/flink/util/ExceptionUtils.java | 37 +++++
.../runtime/checkpoint/CheckpointCoordinator.java | 17 +++
.../checkpoint/StateAssignmentOperation.java | 8 +-
.../OperatorCoordinatorSchedulerTest.java | 156 ++++++++++++++++++++-
.../coordination/TestingOperatorCoordinator.java | 35 ++++-
.../runtime/scheduler/SchedulerTestingUtils.java | 36 +++++
.../apache/flink/core/testutils/FlinkMatchers.java | 61 ++++++--
7 files changed, 333 insertions(+), 17 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6512531..8982b29 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -462,6 +462,43 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether a throwable chain contains a specific type of exception and returns it.
+ * This method handles {@link SerializedThrowable}s in the chain and deserializes them with
+ * the given ClassLoader.
+ *
+ * <p>SerializedThrowables are often used when exceptions might come from dynamically loaded code and
+ * be transported over RPC / HTTP for better error reporting.
+ * The receiving processes or threads might not have the dynamically loaded code available.
+ *
+ * @param throwable the throwable chain to check.
+ * @param searchType the type of exception to search for in the chain.
+ * @param classLoader the ClassLoader to use when encountering a SerializedThrowable.
+ * @return Optional throwable of the requested type if available, otherwise empty
+ */
+ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
+ Throwable throwable,
+ Class<T> searchType,
+ ClassLoader classLoader) {
+
+ if (throwable == null || searchType == null) {
+ return Optional.empty();
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ if (searchType.isAssignableFrom(t.getClass())) {
+ return Optional.of(searchType.cast(t));
+ } else if (t instanceof SerializedThrowable) {
+ t = ((SerializedThrowable) t).deserializeError(classLoader);
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
* Checks whether a throwable chain contains an exception matching a predicate and returns it.
*
* @param throwable the throwable chain to check.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5d2db1d..fbdf2d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -1210,6 +1211,8 @@ public class CheckpointCoordinator {
allowNonRestoredState,
LOG);
+ restoreStateToCoordinators(operatorStates);
+
// update metrics
if (statsTracker != null) {
@@ -1416,6 +1419,20 @@ public class CheckpointCoordinator {
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
+ private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> operatorStates) throws Exception {
+ for (OperatorCoordinatorCheckpointContext coordContext : coordinatorsToCheckpoint) {
+ final OperatorState state = operatorStates.get(coordContext.operatorId());
+ if (state == null) {
+ continue;
+ }
+
+ final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
+ if (coordinatorState != null) {
+ coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 9644430..8ce8e55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -89,7 +89,7 @@ public class StateAssignmentOperation {
// find the states of all operators belonging to this task
List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size());
- boolean statelessTask = true;
+ boolean statelessSubTasks = true;
for (OperatorIDPair operatorIDPair : operatorIDPairs) {
OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
@@ -99,12 +99,12 @@ public class StateAssignmentOperation {
operatorID,
executionJobVertex.getParallelism(),
executionJobVertex.getMaxParallelism());
- } else {
- statelessTask = false;
+ } else if (operatorState.getNumberCollectedStates() > 0) {
+ statelessSubTasks = false;
}
operatorStates.add(operatorState);
}
- if (!statelessTask) { // skip tasks where no operator has any state
+ if (!statelessSubTasks) { // skip tasks where no operator has any state
assignAttemptState(executionJobVertex, operatorStates);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 0354e3d..bb0e68e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,7 +19,9 @@
package org.apache.flink.runtime.operators.coordination;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -37,28 +39,36 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Matcher;
+import org.junit.After;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -77,6 +87,15 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
+ private DefaultScheduler createdScheduler;
+
+ @After
+ public void shutdownScheduler() throws Exception{
+ if (createdScheduler != null) {
+ createdScheduler.suspend(new Exception("shutdown"));
+ }
+ }
+
// ------------------------------------------------------------------------
// tests for scheduling
// ------------------------------------------------------------------------
@@ -184,6 +203,86 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
}
// ------------------------------------------------------------------------
+ // tests for checkpointing
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTakeCheckpoint() throws Exception {
+ final byte[] checkpointData = new byte[656];
+ new Random().nextBytes(checkpointData);
+
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+ coordinator.getLastTriggeredCheckpoint().complete(checkpointData);
+ acknowledgeCurrentCheckpoint(scheduler);
+
+ final OperatorState state = checkpointFuture.get().getOperatorStates().get(testOperatorId);
+ assertArrayEquals(checkpointData, getStateHandleContents(state.getCoordinatorState()));
+ }
+
+ @Test
+ public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
+ final OperatorCoordinator.Provider failingCoordinatorProvider =
+ new TestingOperatorCoordinator.Provider(testOperatorId, CoordinatorThatFailsCheckpointing::new);
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks(failingCoordinatorProvider);
+
+ final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+
+ assertThat(checkpointFuture, futureWillCompleteWithTestException());
+ }
+
+ @Test
+ public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+ final CompletableFuture<?> coordinatorStateFuture = coordinator.getLastTriggeredCheckpoint();
+
+ coordinatorStateFuture.completeExceptionally(new TestException());
+
+ assertThat(checkpointFuture, futureWillCompleteWithTestException());
+ }
+
+ @Test
+ public void testSavepointRestoresCoordinator() throws Exception {
+ final byte[] testCoordinatorState = new byte[123];
+ new Random().nextBytes(testCoordinatorState);
+
+ final DefaultScheduler scheduler = createSchedulerWithRestoredSavepoint(testCoordinatorState);
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ final byte[] restoredState = coordinator.getLastRestoredCheckpointState();
+ assertArrayEquals(testCoordinatorState, restoredState);
+ }
+
+ @Test
+ public void testGlobalFailureResetsToCheckpoint() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ final byte[] coordinatorState = new byte[] {7, 11, 3, 5};
+ takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);
+ failGlobalAndRestart(scheduler, new TestException());
+
+ assertArrayEquals("coordinator should have a restored checkpoint",
+ coordinatorState, coordinator.getLastRestoredCheckpointState());
+ }
+
+ @Test
+ public void testConfirmCheckpointComplete() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ final long checkpointId = takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+
+ assertEquals("coordinator should be notified of completed checkpoint",
+ checkpointId, coordinator.getLastCheckpointComplete());
+ }
+
+ // ------------------------------------------------------------------------
// tests for REST request delivery
// ------------------------------------------------------------------------
@@ -312,7 +411,8 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
- final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
+ final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
+ final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
vertex.setInvokableClass(NoOpInvokable.class);
vertex.addOperatorCoordinator(new SerializedValue<>(provider));
vertex.setParallelism(2);
@@ -328,6 +428,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ this.createdScheduler = scheduler;
return scheduler;
}
@@ -370,6 +471,43 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
}
+ private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) {
+ scheduler.handleGlobalFailure(reason);
+ SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
+
+ executor.triggerScheduledTasks(); // this handles the restart / redeploy
+ SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+ // guard the test assumptions: This must bring the tasks back to RUNNING
+ assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+ }
+
+ private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+ final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
+ executor.triggerAll();
+ return future;
+ }
+
+ private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+ executor.triggerAll();
+ SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
+ executor.triggerAll();
+ }
+
+ private long takeCompleteCheckpoint(
+ DefaultScheduler scheduler,
+ TestingOperatorCoordinator testingOperatorCoordinator,
+ byte[] coordinatorState) throws Exception {
+
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+
+ testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);
+ acknowledgeCurrentCheckpoint(scheduler);
+
+ // wait until checkpoint has completed
+ return checkpointFuture.get().getCheckpointID();
+ }
+
// ------------------------------------------------------------------------
// miscellaneous utilities
// ------------------------------------------------------------------------
@@ -395,6 +533,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
return out.toByteArray();
}
+ private static <T> Matcher<CompletableFuture<T>> futureWillCompleteWithTestException() {
+ return futureWillCompleteExceptionally(
+ (e) -> ExceptionUtils.findThrowableSerializedAware(
+ e, TestException.class, OperatorCoordinatorSchedulerTest.class.getClassLoader()).isPresent(),
+ Duration.ofSeconds(10),
+ "A TestException in the cause chain");
+ }
+
+ private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {
+ if (stateHandle instanceof ByteStreamStateHandle) {
+ return ((ByteStreamStateHandle) stateHandle).getData();
+ }
+ fail("other state handles not implemented");
+ return null;
+ }
+
// ------------------------------------------------------------------------
// test mocks
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 9889ba8..5d93bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -25,7 +25,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* A simple testing implementation of the {@link OperatorCoordinator}.
@@ -36,11 +38,20 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
private final ArrayList<Integer> failedTasks = new ArrayList<>();
+ @Nullable
+ private byte[] lastRestoredCheckpointState;
+
+ private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints;
+
+ private BlockingQueue<Long> lastCheckpointComplete;
+
private boolean started;
private boolean closed;
public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
this.context = context;
+ this.triggeredCheckpoints = new LinkedBlockingQueue<>();
+ this.lastCheckpointComplete = new LinkedBlockingQueue<>();
}
// ------------------------------------------------------------------------
@@ -65,17 +76,22 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
@Override
public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
- throw new UnsupportedOperationException();
+ final CompletableFuture<byte[]> coordinatorStateFuture = new CompletableFuture<>();
+
+ boolean added = triggeredCheckpoints.offer(coordinatorStateFuture);
+ assert added; // guard the test assumptions
+
+ return coordinatorStateFuture;
}
@Override
public void checkpointComplete(long checkpointId) {
- throw new UnsupportedOperationException();
+ lastCheckpointComplete.offer(checkpointId);
}
@Override
public void resetToCheckpoint(byte[] checkpointData) {
- throw new UnsupportedOperationException();
+ lastRestoredCheckpointState = checkpointData;
}
// ------------------------------------------------------------------------
@@ -96,6 +112,19 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
return failedTasks;
}
+ @Nullable
+ public byte[] getLastRestoredCheckpointState() {
+ return lastRestoredCheckpointState;
+ }
+
+ public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws InterruptedException {
+ return triggeredCheckpoints.take();
+ }
+
+ public long getLastCheckpointComplete() throws InterruptedException {
+ return lastCheckpointComplete.take();
+ }
+
// ------------------------------------------------------------------------
// The provider for this coordinator implementation
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 40fe221..124e733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -89,6 +90,7 @@ import java.util.stream.StreamSupport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
/**
* A utility class to create {@link DefaultScheduler} instances for testing.
@@ -262,6 +264,13 @@ public class SchedulerTestingUtils {
);
}
+ public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
+ final JobID jid = scheduler.requestJob().getJobID();
+ getAllCurrentExecutionAttempts(scheduler).forEach(
+ (attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
+ );
+ }
+
public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
final JobID jid = scheduler.requestJob().getJobID();
@@ -272,6 +281,33 @@ public class SchedulerTestingUtils {
}
}
+ public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+ final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+ return checkpointCoordinator.triggerCheckpoint(false);
+ }
+
+ public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+ final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+ assertEquals("Coordinator has not ", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+ final PendingCheckpoint pc = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+
+ // because of races against the async thread in the coordinator, we need to wait here until the
+ // coordinator state is acknowledged. This can be removed once the CheckpointCoordinator is
+ // executes all actions in the Scheduler's main thread executor.
+ while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ }
+ }
+
+ getAllCurrentExecutionAttempts(scheduler).forEach(
+ (attemptId) -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
+ }
+
public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
checkpointCoordinator.triggerCheckpoint(false);
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
index 0303272..ada7bf4 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
@@ -21,6 +21,8 @@ package org.apache.flink.core.testutils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -28,6 +30,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
/**
* Some reusable hamcrest matchers for Flink.
@@ -60,6 +63,18 @@ public class FlinkMatchers {
/**
* Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
*/
+ public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
+ Function<Throwable, Boolean> exceptionCheck,
+ Duration timeout,
+ String checkDescription) {
+ Objects.requireNonNull(exceptionCheck, "exceptionType should not be null");
+ Objects.requireNonNull(timeout, "timeout should not be null");
+ return new FutureWillFailMatcher<>(exceptionCheck, timeout, checkDescription);
+ }
+
+ /**
+ * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
+ */
public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(Duration timeout) {
return futureWillCompleteExceptionally(Throwable.class, timeout);
}
@@ -118,14 +133,31 @@ public class FlinkMatchers {
private static final class FutureWillFailMatcher<T> extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
- private final Class<? extends Throwable> expectedException;
+ private final Function<Throwable, Boolean> exceptionValidator;
private final Duration timeout;
- FutureWillFailMatcher(Class<? extends Throwable> expectedException, Duration timeout) {
+ private final String validationDescription;
+
+ FutureWillFailMatcher(
+ Class<? extends Throwable> expectedException,
+ Duration timeout) {
+
super(CompletableFuture.class);
- this.expectedException = expectedException;
+ this.exceptionValidator = (e) -> expectedException.isAssignableFrom(e.getClass());
this.timeout = timeout;
+ this.validationDescription = expectedException.getName();
+ }
+
+ FutureWillFailMatcher(
+ Function<Throwable, Boolean> exceptionValidator,
+ Duration timeout,
+ String validationDescription) {
+
+ super(CompletableFuture.class);
+ this.exceptionValidator = exceptionValidator;
+ this.timeout = timeout;
+ this.validationDescription = validationDescription;
}
@Override
@@ -144,18 +176,29 @@ public class FlinkMatchers {
return false;
}
catch (ExecutionException e) {
- if (e.getCause() == null || !expectedException.isAssignableFrom(e.getCause().getClass())) {
- mismatchDescription.appendText("Future completed with different exception: " + e.getCause());
- return false;
+ final Throwable cause = e.getCause();
+ if (cause != null && exceptionValidator.apply(cause)) {
+ return true;
+ }
+
+ String otherDescription = "(null)";
+ if (cause != null) {
+ final StringWriter stm = new StringWriter();
+ try (PrintWriter wrt = new PrintWriter(stm)) {
+ cause.printStackTrace(wrt);
+ }
+ otherDescription = stm.toString();
}
- return true;
+
+ mismatchDescription.appendText("Future completed with different exception: " + otherDescription);
+ return false;
}
}
@Override
public void describeTo(Description description) {
- description.appendText("A CompletableFuture that will failed within " +
- timeout.toMillis() + " milliseconds with: " + expectedException.getName());
+ description.appendText("A CompletableFuture that will have failed within " +
+ timeout.toMillis() + " milliseconds with: " + validationDescription);
}
}
}