You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:06:45 UTC
[flink] 01/02: [FLINK-21215][task] Do not overwrite the original
CheckpointFailureReason in AsyncCheckpointRunnable
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37a6e514d3c7653a8dd799a1f633f29501931169
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 1 09:52:30 2021 +0100
[FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
Before this change, original failure reason would be hidden and replaced with CHECKPOINT_ASYNC_EXCEPTION
---
.../runtime/tasks/AsyncCheckpointRunnable.java | 14 ++++-
.../runtime/tasks/AsyncCheckpointRunnableTest.java | 64 ++++++++++++++++------
2 files changed, 59 insertions(+), 19 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index 59809b1..4c3fd7b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -263,11 +264,20 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
// Otherwise this followup exception could race the original exception in
// failing the task.
try {
+ Optional<CheckpointException> underlyingCheckpointException =
+ ExceptionUtils.findThrowable(
+ checkpointException, CheckpointException.class);
+
+ // If this failure is already a CheckpointException, do not overwrite the
+ // original CheckpointFailureReason
+ CheckpointFailureReason reportedFailureReason =
+ underlyingCheckpointException
+ .map(exception -> exception.getCheckpointFailureReason())
+ .orElse(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
taskEnvironment.declineCheckpoint(
checkpointMetaData.getCheckpointId(),
new CheckpointException(
- CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
- checkpointException));
+ reportedFailureReason, checkpointException));
} catch (Exception unhandled) {
AsynchronousException asyncException = new AsynchronousException(unhandled);
asyncExceptionHandler.handleAsyncException(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 50551f1..0e4642e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -37,22 +37,21 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import java.util.function.Supplier;
/** Tests for {@link AsyncCheckpointRunnable}. */
public class AsyncCheckpointRunnableTest {
@Test
public void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
- testAsyncCheckpointException(() -> true);
+ testAsyncCheckpointException(true);
}
@Test
public void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
- testAsyncCheckpointException(() -> false);
+ testAsyncCheckpointException(false);
}
- private void testAsyncCheckpointException(Supplier<Boolean> isTaskRunning) {
+ private void testAsyncCheckpointException(boolean isTaskRunning) {
final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
snapshotsInProgress.put(
new OperatorID(),
@@ -67,28 +66,59 @@ public class AsyncCheckpointRunnableTest {
final TestEnvironment environment = new TestEnvironment();
final AsyncCheckpointRunnable runnable =
- new AsyncCheckpointRunnable(
- snapshotsInProgress,
- new CheckpointMetaData(1, 1L),
- new CheckpointMetricsBuilder(),
- 1L,
- "Task Name",
- r -> {},
- r -> {},
- environment,
- (msg, ex) -> {},
- isTaskRunning);
+ createAsyncRunnable(snapshotsInProgress, environment, isTaskRunning);
runnable.run();
- if (isTaskRunning.get()) {
+ if (isTaskRunning) {
Assert.assertSame(
- (environment.getCause()).getCheckpointFailureReason(),
+ environment.getCause().getCheckpointFailureReason(),
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
} else {
Assert.assertNull(environment.getCause());
}
}
+ @Test
+ public void testDeclineAsyncCheckpoint() {
+ CheckpointFailureReason originalReason =
+ CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
+
+ final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
+ snapshotsInProgress.put(
+ new OperatorID(),
+ new OperatorSnapshotFutures(
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ ExceptionallyDoneFuture.of(new CheckpointException(originalReason)),
+ DoneFuture.of(SnapshotResult.empty())));
+
+ final TestEnvironment environment = new TestEnvironment();
+ final AsyncCheckpointRunnable runnable =
+ createAsyncRunnable(snapshotsInProgress, environment, true);
+ runnable.run();
+
+ Assert.assertSame(environment.getCause().getCheckpointFailureReason(), originalReason);
+ }
+
+ private AsyncCheckpointRunnable createAsyncRunnable(
+ Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress,
+ TestEnvironment environment,
+ boolean isTaskRunning) {
+ return new AsyncCheckpointRunnable(
+ snapshotsInProgress,
+ new CheckpointMetaData(1, 1L),
+ new CheckpointMetricsBuilder(),
+ 1L,
+ "Task Name",
+ r -> {},
+ r -> {},
+ environment,
+ (msg, ex) -> {},
+ () -> isTaskRunning);
+ }
+
private static class TestEnvironment extends StreamMockEnvironment {
CheckpointException cause = null;