You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:06:46 UTC
[flink] 02/02: [FLINK-21215][task] Optimise order of cancellation
AsyncCheckpointRunnable and notifing JobManager
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6eeacdb237d32c29f95dbdb6f68ac5e7c81fb2f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 1 14:34:24 2021 +0100
[FLINK-21215][task] Optimise order of cancellation AsyncCheckpointRunnable and notifing JobManager
The order of cancelling AsyncCheckpointRunnable and notifing the JM was sub-optimal.
Instead of notifing the JM first, we were first cancelling the future, which could
create a race condition, were the secondary failure was reported first, causing
slightly less readable logs/error messages.
---
.../SingleCheckpointBarrierHandler.java | 2 +-
.../io/checkpointing/UnalignedControllerTest.java | 32 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 6a0c148..f2e2381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -257,8 +257,8 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
numBarriersReceived = 0;
controller.abortPendingCheckpoint(cancelledId, exception);
- allBarriersReceivedFuture.completeExceptionally(exception);
notifyAbort(cancelledId, exception);
+ allBarriersReceivedFuture.completeExceptionally(exception);
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
index 9aace51..05de6aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedControllerTest.java
@@ -49,6 +49,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,6 +61,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -556,6 +559,16 @@ public class UnalignedControllerTest {
}
@Test
+ public void testNotifyAbortCheckpointBeforeCanellingAsyncCheckpoint() throws Exception {
+ ValidateAsyncFutureNotCompleted handler = new ValidateAsyncFutureNotCompleted(1);
+ inputGate = createInputGate(2, handler);
+ handler.setInputGate(inputGate);
+ addSequence(inputGate, createBarrier(1, 0), createCancellationBarrier(1, 1));
+
+ addSequence(inputGate, createEndOfPartition(0), createEndOfPartition(1));
+ }
+
+ @Test
public void testSingleChannelAbortCheckpoint() throws Exception {
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1);
inputGate = createInputGate(1, handler);
@@ -975,6 +988,25 @@ public class UnalignedControllerTest {
}
}
+ static class ValidateAsyncFutureNotCompleted extends ValidatingCheckpointHandler {
+ private @Nullable CheckpointedInputGate inputGate;
+
+ public ValidateAsyncFutureNotCompleted(long nextExpectedCheckpointId) {
+ super(nextExpectedCheckpointId);
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
+ super.abortCheckpointOnBarrier(checkpointId, cause);
+ checkState(inputGate != null);
+ assertFalse(inputGate.getAllBarriersReceivedFuture(checkpointId).isDone());
+ }
+
+ public void setInputGate(CheckpointedInputGate inputGate) {
+ this.inputGate = inputGate;
+ }
+ }
+
/**
* Specific {@link AbstractInvokable} implementation to record and validate which checkpoint id
* is executed and how many checkpoints are executed.