You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/10 16:10:07 UTC

[flink] 03/08: [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jun 3 14:12:04 2020 +0200

    [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."
    
    This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058
    which introduced a race condition when task thread and netty
    thread compete for ChannelStateWriteResult.
    
    Instead, next commits fix it by:
    1. Map size validation error will be prevented simply by increasing the limit
    2. When a checkpoint is subsumed, it's write result will be removed from on future completion
---
 .../checkpoint/channel/ChannelStateWriter.java        |  4 +++-
 .../checkpoint/channel/ChannelStateWriterImpl.java    |  1 -
 .../channel/ChannelStateWriterImplTest.java           | 19 ++++++++-----------
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index af2a708..02a3a69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable {
 
 		@Override
 		public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
-			return ChannelStateWriteResult.EMPTY;
+			return new ChannelStateWriteResult(
+				CompletableFuture.completedFuture(Collections.emptyList()),
+				CompletableFuture.completedFuture(Collections.emptyList()));
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 6158358..8996b3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void start(long checkpointId, CheckpointOptions checkpointOptions) {
-		results.keySet().forEach(oldCheckpointId -> abort(oldCheckpointId, new Exception("Starting new checkpoint " + checkpointId)));
 		LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
 		ChannelStateWriteResult result = new ChannelStateWriteResult();
 		ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 9d7a7ea..0dae88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest {
 		unwrappingError(TestException.class, () -> callStart(writer));
 	}
 
-	@Test
-	public void testStartAbortsOldCheckpoints() throws Exception {
-		int maxCheckpoints = 10;
-		runWithSyncWorker((writer, worker) -> {
-			writer.start(0, CheckpointOptions.forCheckpointWithDefaultLocation());
-			ChannelStateWriteResult writeResult = writer.getWriteResult(0);
-			for (int i = 1; i <= maxCheckpoints; i++) {
+	@Test(expected = IllegalStateException.class)
+	public void testLimit() throws IOException {
+		int maxCheckpoints = 3;
+		try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) {
+			writer.open();
+			for (int i = 0; i < maxCheckpoints; i++) {
 				writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
-				worker.processAllRequests();
-				assertTrue(writeResult.isDone());
-				writeResult = writer.getWriteResult(i);
 			}
-		});
+			writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation());
+		}
 	}
 
 	@Test(expected = IllegalStateException.class)