You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/03 10:42:56 UTC
[flink] branch release-1.11 updated: [FLINK-21215][task] Do not
overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 772e8fb [FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
772e8fb is described below
commit 772e8fb634fa84b40afe58b314dcb02dd16b70b6
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 1 09:52:30 2021 +0100
[FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
Before this change, original failure reason would be hidden and replaced with CHECKPOINT_ASYNC_EXCEPTION
---
.../runtime/tasks/AsyncCheckpointRunnable.java | 14 ++++-
.../runtime/tasks/AsyncCheckpointRunnableTest.java | 64 ++++++++++++++++------
2 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index 25254c2..5e4afa2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -222,11 +223,20 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
// Otherwise this followup exception could race the original exception in
// failing the task.
try {
+ Optional<CheckpointException> underlyingCheckpointException =
+ ExceptionUtils.findThrowable(
+ checkpointException, CheckpointException.class);
+
+ // If this failure is already a CheckpointException, do not overwrite the
+ // original CheckpointFailureReason
+ CheckpointFailureReason reportedFailureReason =
+ underlyingCheckpointException
+ .map(exception -> exception.getCheckpointFailureReason())
+ .orElse(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
taskEnvironment.declineCheckpoint(
checkpointMetaData.getCheckpointId(),
new CheckpointException(
- CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
- checkpointException));
+ reportedFailureReason, checkpointException));
} catch (Exception unhandled) {
AsynchronousException asyncException = new AsynchronousException(unhandled);
asyncExceptionHandler.handleAsyncException(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 97be4b2..9a96020 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -37,22 +37,21 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import java.util.function.Supplier;
/** Tests for {@link AsyncCheckpointRunnable}. */
public class AsyncCheckpointRunnableTest {
@Test
public void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
- testAsyncCheckpointException(() -> true);
+ testAsyncCheckpointException(true);
}
@Test
public void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
- testAsyncCheckpointException(() -> false);
+ testAsyncCheckpointException(false);
}
- private void testAsyncCheckpointException(Supplier<Boolean> isTaskRunning) {
+ private void testAsyncCheckpointException(boolean isTaskRunning) {
final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
snapshotsInProgress.put(
new OperatorID(),
@@ -67,20 +66,10 @@ public class AsyncCheckpointRunnableTest {
final TestEnvironment environment = new TestEnvironment();
final AsyncCheckpointRunnable runnable =
- new AsyncCheckpointRunnable(
- snapshotsInProgress,
- new CheckpointMetaData(1, 1L),
- new CheckpointMetrics(),
- 1L,
- "Task Name",
- r -> {},
- r -> {},
- environment,
- (msg, ex) -> {},
- isTaskRunning);
+ createAsyncRunnable(snapshotsInProgress, environment, isTaskRunning);
runnable.run();
- if (isTaskRunning.get()) {
+ if (isTaskRunning) {
Assert.assertTrue(environment.getCause() instanceof CheckpointException);
Assert.assertSame(
((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
@@ -90,6 +79,49 @@ public class AsyncCheckpointRunnableTest {
}
}
+ @Test
+ public void testDeclineAsyncCheckpoint() {
+ CheckpointFailureReason originalReason =
+ CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
+
+ final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
+ snapshotsInProgress.put(
+ new OperatorID(),
+ new OperatorSnapshotFutures(
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ ExceptionallyDoneFuture.of(new CheckpointException(originalReason)),
+ DoneFuture.of(SnapshotResult.empty())));
+
+ final TestEnvironment environment = new TestEnvironment();
+ final AsyncCheckpointRunnable runnable =
+ createAsyncRunnable(snapshotsInProgress, environment, true);
+ runnable.run();
+
+ Assert.assertSame(
+ ((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
+ originalReason);
+ }
+
+ private AsyncCheckpointRunnable createAsyncRunnable(
+ Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress,
+ TestEnvironment environment,
+ boolean isTaskRunning) {
+ return new AsyncCheckpointRunnable(
+ snapshotsInProgress,
+ new CheckpointMetaData(1, 1L),
+ new CheckpointMetrics(),
+ 1L,
+ "Task Name",
+ r -> {},
+ r -> {},
+ environment,
+ (msg, ex) -> {},
+ () -> isTaskRunning);
+ }
+
private static class TestEnvironment extends StreamMockEnvironment {
Throwable cause = null;