You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/16 04:46:06 UTC
[flink] 01/02: [FLINK-17727][task] don't create output stream with
no channel state (unaligned checkpoints)
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9207b81cb15d413c07f55d619667ab78f7725a9a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:49:51 2020 +0200
[FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)
---
.../channel/ChannelStateCheckpointWriter.java | 7 +++++++
.../channel/ChannelStateCheckpointWriterTest.java | 23 ++++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index ef04857..6a2b68d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import static java.util.Collections.emptyList;
import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -150,6 +151,12 @@ class ChannelStateCheckpointWriter {
}
private void finishWriteAndResult() throws IOException {
+ if (inputChannelOffsets.isEmpty() && resultSubpartitionOffsets.isEmpty()) {
+ dataStream.close();
+ result.inputChannelStateHandles.complete(emptyList());
+ result.resultSubpartitionStateHandles.complete(emptyList());
+ return;
+ }
dataStream.flush();
StreamStateHandle underlying = checkpointStream.closeAndGetHandle();
complete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 196c948..1d501f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
import org.apache.flink.util.function.RunnableWithException;
@@ -37,6 +38,7 @@ import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* {@link ChannelStateCheckpointWriter} test.
@@ -47,6 +49,27 @@ public class ChannelStateCheckpointWriterTest {
private final Random random = new Random();
@Test
+ public void testEmptyState() throws Exception {
+ MemoryCheckpointOutputStream stream = new MemoryCheckpointOutputStream(1000) {
+ @Override
+ public StreamStateHandle closeAndGetHandle() {
+ fail("closeAndGetHandle shouldn't be called for empty channel state");
+ return null;
+ }
+ };
+ ChannelStateCheckpointWriter writer = new ChannelStateCheckpointWriter(
+ 1L,
+ new ChannelStateWriteResult(),
+ stream,
+ new ChannelStateSerializerImpl(),
+ NO_OP_RUNNABLE
+ );
+ writer.completeOutput();
+ writer.completeInput();
+ assertTrue(stream.isClosed());
+ }
+
+ @Test
public void testRecyclingBuffers() throws Exception {
ChannelStateCheckpointWriter writer = createWriter(new ChannelStateWriteResult());
NetworkBuffer buffer = new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(10, null), FreeingBufferRecycler.INSTANCE);