You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/03 10:42:56 UTC

[flink] branch release-1.11 updated: [FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 772e8fb  [FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
772e8fb is described below

commit 772e8fb634fa84b40afe58b314dcb02dd16b70b6
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 1 09:52:30 2021 +0100

    [FLINK-21215][task] Do not overwrite the original CheckpointFailureReason in AsyncCheckpointRunnable
    
    Before this change, original failure reason would be hidden and replaced with CHECKPOINT_ASYNC_EXCEPTION
---
 .../runtime/tasks/AsyncCheckpointRunnable.java     | 14 ++++-
 .../runtime/tasks/AsyncCheckpointRunnableTest.java | 64 ++++++++++++++++------
 2 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index 25254c2..5e4afa2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -222,11 +223,20 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
                     // Otherwise this followup exception could race the original exception in
                     // failing the task.
                     try {
+                        Optional<CheckpointException> underlyingCheckpointException =
+                                ExceptionUtils.findThrowable(
+                                        checkpointException, CheckpointException.class);
+
+                        // If this failure is already a CheckpointException, do not overwrite the
+                        // original CheckpointFailureReason
+                        CheckpointFailureReason reportedFailureReason =
+                                underlyingCheckpointException
+                                        .map(exception -> exception.getCheckpointFailureReason())
+                                        .orElse(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
                         taskEnvironment.declineCheckpoint(
                                 checkpointMetaData.getCheckpointId(),
                                 new CheckpointException(
-                                        CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
-                                        checkpointException));
+                                        reportedFailureReason, checkpointException));
                     } catch (Exception unhandled) {
                         AsynchronousException asyncException = new AsynchronousException(unhandled);
                         asyncExceptionHandler.handleAsyncException(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 97be4b2..9a96020 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -37,22 +37,21 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.function.Supplier;
 
 /** Tests for {@link AsyncCheckpointRunnable}. */
 public class AsyncCheckpointRunnableTest {
 
     @Test
     public void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
-        testAsyncCheckpointException(() -> true);
+        testAsyncCheckpointException(true);
     }
 
     @Test
     public void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
-        testAsyncCheckpointException(() -> false);
+        testAsyncCheckpointException(false);
     }
 
-    private void testAsyncCheckpointException(Supplier<Boolean> isTaskRunning) {
+    private void testAsyncCheckpointException(boolean isTaskRunning) {
         final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
         snapshotsInProgress.put(
                 new OperatorID(),
@@ -67,20 +66,10 @@ public class AsyncCheckpointRunnableTest {
 
         final TestEnvironment environment = new TestEnvironment();
         final AsyncCheckpointRunnable runnable =
-                new AsyncCheckpointRunnable(
-                        snapshotsInProgress,
-                        new CheckpointMetaData(1, 1L),
-                        new CheckpointMetrics(),
-                        1L,
-                        "Task Name",
-                        r -> {},
-                        r -> {},
-                        environment,
-                        (msg, ex) -> {},
-                        isTaskRunning);
+                createAsyncRunnable(snapshotsInProgress, environment, isTaskRunning);
         runnable.run();
 
-        if (isTaskRunning.get()) {
+        if (isTaskRunning) {
             Assert.assertTrue(environment.getCause() instanceof CheckpointException);
             Assert.assertSame(
                     ((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
@@ -90,6 +79,49 @@ public class AsyncCheckpointRunnableTest {
         }
     }
 
+    @Test
+    public void testDeclineAsyncCheckpoint() {
+        CheckpointFailureReason originalReason =
+                CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
+
+        final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
+        snapshotsInProgress.put(
+                new OperatorID(),
+                new OperatorSnapshotFutures(
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        ExceptionallyDoneFuture.of(new CheckpointException(originalReason)),
+                        DoneFuture.of(SnapshotResult.empty())));
+
+        final TestEnvironment environment = new TestEnvironment();
+        final AsyncCheckpointRunnable runnable =
+                createAsyncRunnable(snapshotsInProgress, environment, true);
+        runnable.run();
+
+        Assert.assertSame(
+                ((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
+                originalReason);
+    }
+
+    private AsyncCheckpointRunnable createAsyncRunnable(
+            Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress,
+            TestEnvironment environment,
+            boolean isTaskRunning) {
+        return new AsyncCheckpointRunnable(
+                snapshotsInProgress,
+                new CheckpointMetaData(1, 1L),
+                new CheckpointMetrics(),
+                1L,
+                "Task Name",
+                r -> {},
+                r -> {},
+                environment,
+                (msg, ex) -> {},
+                () -> isTaskRunning);
+    }
+
     private static class TestEnvironment extends StreamMockEnvironment {
 
         Throwable cause = null;