You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/10 16:10:07 UTC
[flink] 03/08: [FLINK-17869][task][checkpointing] Revert
"[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts
previous checkpoints before a new checkpoint is started."
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jun 3 14:12:04 2020 +0200
[FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."
This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058
which introduced a race condition when task thread and netty
thread compete for ChannelStateWriteResult.
Instead, next commits fix it by:
1. Map size validation error will be prevented simply by increasing the limit
2. When a checkpoint is subsumed, it's write result will be removed from on future completion
---
.../checkpoint/channel/ChannelStateWriter.java | 4 +++-
.../checkpoint/channel/ChannelStateWriterImpl.java | 1 -
.../channel/ChannelStateWriterImplTest.java | 19 ++++++++-----------
3 files changed, 11 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index af2a708..02a3a69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable {
@Override
public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
- return ChannelStateWriteResult.EMPTY;
+ return new ChannelStateWriteResult(
+ CompletableFuture.completedFuture(Collections.emptyList()),
+ CompletableFuture.completedFuture(Collections.emptyList()));
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 6158358..8996b3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void start(long checkpointId, CheckpointOptions checkpointOptions) {
- results.keySet().forEach(oldCheckpointId -> abort(oldCheckpointId, new Exception("Starting new checkpoint " + checkpointId)));
LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 9d7a7ea..0dae88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest {
unwrappingError(TestException.class, () -> callStart(writer));
}
- @Test
- public void testStartAbortsOldCheckpoints() throws Exception {
- int maxCheckpoints = 10;
- runWithSyncWorker((writer, worker) -> {
- writer.start(0, CheckpointOptions.forCheckpointWithDefaultLocation());
- ChannelStateWriteResult writeResult = writer.getWriteResult(0);
- for (int i = 1; i <= maxCheckpoints; i++) {
+ @Test(expected = IllegalStateException.class)
+ public void testLimit() throws IOException {
+ int maxCheckpoints = 3;
+ try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) {
+ writer.open();
+ for (int i = 0; i < maxCheckpoints; i++) {
writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
- worker.processAllRequests();
- assertTrue(writeResult.isDone());
- writeResult = writer.getWriteResult(i);
}
- });
+ writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation());
+ }
}
@Test(expected = IllegalStateException.class)