You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/16 04:46:06 UTC

[flink] 01/02: [FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9207b81cb15d413c07f55d619667ab78f7725a9a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:49:51 2020 +0200

    [FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)
---
 .../channel/ChannelStateCheckpointWriter.java      |  7 +++++++
 .../channel/ChannelStateCheckpointWriterTest.java  | 23 ++++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index ef04857..6a2b68d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
+import static java.util.Collections.emptyList;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -150,6 +151,12 @@ class ChannelStateCheckpointWriter {
 	}
 
 	private void finishWriteAndResult() throws IOException {
+		if (inputChannelOffsets.isEmpty() && resultSubpartitionOffsets.isEmpty()) {
+			dataStream.close();
+			result.inputChannelStateHandles.complete(emptyList());
+			result.resultSubpartitionStateHandles.complete(emptyList());
+			return;
+		}
 		dataStream.flush();
 		StreamStateHandle underlying = checkpointStream.closeAndGetHandle();
 		complete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 196c948..1d501f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -37,6 +38,7 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * {@link ChannelStateCheckpointWriter} test.
@@ -47,6 +49,27 @@ public class ChannelStateCheckpointWriterTest {
 	private final Random random = new Random();
 
 	@Test
+	public void testEmptyState() throws Exception {
+		MemoryCheckpointOutputStream stream = new MemoryCheckpointOutputStream(1000) {
+			@Override
+			public StreamStateHandle closeAndGetHandle() {
+				fail("closeAndGetHandle shouldn't be called for empty channel state");
+				return null;
+			}
+		};
+		ChannelStateCheckpointWriter writer = new ChannelStateCheckpointWriter(
+				1L,
+				new ChannelStateWriteResult(),
+				stream,
+				new ChannelStateSerializerImpl(),
+				NO_OP_RUNNABLE
+		);
+		writer.completeOutput();
+		writer.completeInput();
+		assertTrue(stream.isClosed());
+	}
+
+	@Test
 	public void testRecyclingBuffers() throws Exception {
 		ChannelStateCheckpointWriter writer = createWriter(new ChannelStateWriteResult());
 		NetworkBuffer buffer = new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(10, null), FreeingBufferRecycler.INSTANCE);