You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:06:46 UTC

[flink] 02/02: [FLINK-21215][task] Optimise order of cancellation AsyncCheckpointRunnable and notifing JobManager

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6eeacdb237d32c29f95dbdb6f68ac5e7c81fb2f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 1 14:34:24 2021 +0100

    [FLINK-21215][task] Optimise order of cancellation AsyncCheckpointRunnable and notifing JobManager
    
    The order of cancelling AsyncCheckpointRunnable and notifing the JM was sub-optimal.
    Instead of notifing the JM first, we were first cancelling the future, which could
    create a race condition, were the secondary failure was reported first, causing
    slightly less readable logs/error messages.
---
 .../SingleCheckpointBarrierHandler.java            |  2 +-
 .../io/checkpointing/UnalignedControllerTest.java  | 32 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 6a0c148..f2e2381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -257,8 +257,8 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
                 Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
         numBarriersReceived = 0;
         controller.abortPendingCheckpoint(cancelledId, exception);
-        allBarriersReceivedFuture.completeExceptionally(exception);
         notifyAbort(cancelledId, exception);
+        allBarriersReceivedFuture.completeExceptionally(exception);
     }
 
     @Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
index 9aace51..05de6aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
@@ -49,6 +49,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,6 +61,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -556,6 +559,16 @@ public class UnalignedControllerTest {
     }
 
     @Test
+    public void testNotifyAbortCheckpointBeforeCanellingAsyncCheckpoint() throws Exception {
+        ValidateAsyncFutureNotCompleted handler = new ValidateAsyncFutureNotCompleted(1);
+        inputGate = createInputGate(2, handler);
+        handler.setInputGate(inputGate);
+        addSequence(inputGate, createBarrier(1, 0), createCancellationBarrier(1, 1));
+
+        addSequence(inputGate, createEndOfPartition(0), createEndOfPartition(1));
+    }
+
+    @Test
     public void testSingleChannelAbortCheckpoint() throws Exception {
         ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1);
         inputGate = createInputGate(1, handler);
@@ -975,6 +988,25 @@ public class UnalignedControllerTest {
         }
     }
 
+    static class ValidateAsyncFutureNotCompleted extends ValidatingCheckpointHandler {
+        private @Nullable CheckpointedInputGate inputGate;
+
+        public ValidateAsyncFutureNotCompleted(long nextExpectedCheckpointId) {
+            super(nextExpectedCheckpointId);
+        }
+
+        @Override
+        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
+            super.abortCheckpointOnBarrier(checkpointId, cause);
+            checkState(inputGate != null);
+            assertFalse(inputGate.getAllBarriersReceivedFuture(checkpointId).isDone());
+        }
+
+        public void setInputGate(CheckpointedInputGate inputGate) {
+            this.inputGate = inputGate;
+        }
+    }
+
     /**
      * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint id
      * is executed and how many checkpoints are executed.