You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:23 UTC

[flink] 07/09: [FLINK-19047][network] Move unaligned checkpointing methods to separate interfaces

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8250a01081e76a05f07e2f66d3adccffa811c30
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Aug 24 12:06:22 2020 +0200

    [FLINK-19047][network] Move unaligned checkpointing methods to separate interfaces
---
 .../network/api/writer/ResultPartitionWriter.java  |  7 ----
 .../partition/BoundedBlockingSubpartition.java     |  6 ----
 .../partition/CheckpointedResultPartition.java     | 40 ++++++++++++++++++++++
 .../partition/CheckpointedResultSubpartition.java  | 38 ++++++++++++++++++++
 .../partition/PipelinedResultPartition.java        | 17 ++++++++-
 .../network/partition/PipelinedSubpartition.java   |  2 +-
 .../io/network/partition/ResultPartition.java      |  9 -----
 .../io/network/partition/ResultSubpartition.java   |  7 ----
 ...bleNotifyingResultPartitionWriterDecorator.java | 31 +++++++++++++----
 .../io/network/api/writer/RecordWriterTest.java    |  4 +--
 .../partition/MockResultPartitionWriter.java       |  5 ---
 .../io/network/partition/ResultPartitionTest.java  |  6 ++--
 .../flink/streaming/runtime/tasks/StreamTask.java  |  8 ++++-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 15 ++++++--
 .../streaming/runtime/tasks/StreamTaskTest.java    |  9 ++++-
 15 files changed, 153 insertions(+), 51 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 8cbb076..5d13dc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -44,12 +43,6 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	 */
 	void setup() throws IOException;
 
-	/**
-	 * Reads the previous output states with the given reader for unaligned checkpoint.
-	 * It should be done before task processing the inputs.
-	 */
-	void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
-
 	ResultPartitionID getPartitionId();
 
 	int getNumberOfSubpartitions();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 767bd23..30e1924 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,7 +30,6 @@ import javax.annotation.concurrent.GuardedBy;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -146,11 +145,6 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
 		}
 	}
 
-	@Override
-	public List<Buffer> requestInflightBufferSnapshot() {
-		throw new UnsupportedOperationException("The batch job does not support unaligned checkpoint.");
-	}
-
 	private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
 		try {
 			final Buffer buffer = bufferConsumer.build();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java
new file mode 100644
index 0000000..f3e2c2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+
+import java.io.IOException;
+
+/**
+ * Interface for partitions that are checkpointed, meaning they store data as part of unaligned checkpoints.
+ */
+public interface CheckpointedResultPartition {
+
+	/**
+	 * Gets the checkpointed subpartition with the given subpartitionIndex.
+	 */
+	CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex);
+
+	/**
+	 * Reads the previous output states with the given reader for unaligned checkpoint.
+	 * It should be done before task processing the inputs.
+	 */
+	void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java
new file mode 100644
index 0000000..bf54716
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for subpartitions that are checkpointed, meaning they store data as part of unaligned checkpoints.
+ */
+public interface CheckpointedResultSubpartition {
+
+	ResultSubpartitionInfo getSubpartitionInfo();
+
+	List<Buffer> requestInflightBufferSnapshot();
+
+	void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index f61e826..142cebf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -45,7 +46,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
  * freed.
  */
-public class PipelinedResultPartition extends ResultPartition {
+public class PipelinedResultPartition extends ResultPartition implements CheckpointedResultPartition {
 
 	/** The lock that guard release operations (which can be asynchronously propagated from the
 	 * networks threads. */
@@ -121,6 +122,20 @@ public class PipelinedResultPartition extends ResultPartition {
 	}
 
 	@Override
+	public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+		return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
+	}
+
+	@Override
+	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
+		for (ResultSubpartition subPar : getAllPartitions()) {
+			((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
+		}
+
+		LOG.debug("{}: Finished reading recovered state.", this);
+	}
+
+	@Override
 	@SuppressWarnings("FieldAccessNotGuarded")
 	public String toString() {
 		return "PipelinedResultPartition " + partitionId.toString() + " [" + partitionType + ", "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index b99bf02..3463e44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -57,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
  * {@link BufferConsumer} present in the queue.
  */
-public class PipelinedSubpartition extends ResultSubpartition {
+public class PipelinedSubpartition extends ResultSubpartition implements CheckpointedResultSubpartition {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 42e97a6..afe60f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -151,14 +150,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		partitionManager.registerResultPartition(this);
 	}
 
-	@Override
-	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.readRecoveredState(stateReader);
-		}
-		LOG.debug("{}: Finished reading recovered state.", this);
-	}
-
 	public String getOwningTaskName() {
 		return owningTaskName;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index c2016c6..71b4edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,13 +19,11 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
 import java.io.IOException;
-import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -77,9 +75,6 @@ public abstract class ResultSubpartition {
 		parent.onConsumedSubpartition(getSubPartitionIndex());
 	}
 
-	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
-	}
-
 	/**
 	 * Adds the given buffer.
 	 *
@@ -117,8 +112,6 @@ public abstract class ResultSubpartition {
 		return add(bufferConsumer, false);
 	}
 
-	public abstract List<Buffer> requestInflightBufferSnapshot();
-
 	public abstract void flush();
 
 	public abstract void finish() throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index c1a0ab4..e8d8816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
@@ -42,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * results, receivers are deployed as soon as the first buffer is added to the result partition.
  * With blocking results on the other hand, receivers are deployed after the partition is finished.
  */
-public class ConsumableNotifyingResultPartitionWriterDecorator implements ResultPartitionWriter {
+public class ConsumableNotifyingResultPartitionWriterDecorator implements ResultPartitionWriter, CheckpointedResultPartition {
 
 	private final TaskActions taskActions;
 
@@ -101,11 +103,6 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
-	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
-		partitionWriter.readRecoveredState(stateReader);
-	}
-
-	@Override
 	public boolean addBufferConsumer(
 			BufferConsumer bufferConsumer,
 			int subpartitionIndex,
@@ -165,6 +162,28 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	// ------------------------------------------------------------------------
+	//  checkpointable methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+		return getCheckpointablePartition().getCheckpointedSubpartition(subpartitionIndex);
+	}
+
+	@Override
+	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
+		getCheckpointablePartition().readRecoveredState(stateReader);
+	}
+
+	private CheckpointedResultPartition getCheckpointablePartition() {
+		if (partitionWriter instanceof CheckpointedResultPartition) {
+			return (CheckpointedResultPartition) partitionWriter;
+		} else {
+			throw new IllegalStateException("This partition is not checkpointable: " + partitionWriter);
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  Factory
 	// ------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 1593cd6..0f87c4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -45,9 +45,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
@@ -457,7 +457,7 @@ public class RecordWriterTest {
 
 		final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
 		final ChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states);
-		final ResultPartition partition = new ResultPartitionBuilder()
+		final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferPool(globalPool)
 			.build();
 		final RecordWriter<IntValue> recordWriter = new RecordWriterBuilder<IntValue>().build(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index d6a7b50..518fed4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -40,10 +39,6 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 	}
 
 	@Override
-	public void readRecoveredState(ChannelStateReader stateReader) {
-	}
-
-	@Override
 	public ResultPartitionID getPartitionId() {
 		return partitionId;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 23e61c8..7f419ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -449,7 +449,7 @@ public class ResultPartitionTest {
 	public void testInitializeEmptyState() throws Exception {
 		final int totalBuffers = 2;
 		final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1);
-		final ResultPartition partition = new ResultPartitionBuilder()
+		final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferPool(globalPool)
 			.build();
 		final ChannelStateReader stateReader = ChannelStateReader.NO_OP;
@@ -481,7 +481,7 @@ public class ResultPartitionTest {
 
 		final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
 		final ChannelStateReader stateReader = new FiniteChannelStateReader(totalStates, states);
-		final ResultPartition partition = new ResultPartitionBuilder()
+		final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferPool(globalPool)
 			.build();
 		final ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -540,7 +540,7 @@ public class ResultPartitionTest {
 	public void testReadRecoveredStateWithException() throws Exception {
 		final int totalBuffers = 2;
 		final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1);
-		final ResultPartition partition = new ResultPartitionBuilder()
+		final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferPool(globalPool)
 			.build();
 		final ChannelStateReader stateReader = new ChannelStateReaderWithException();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 20c7557..c4b1be5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -494,7 +495,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
 		if (writers != null) {
 			for (ResultPartitionWriter writer : writers) {
-				writer.readRecoveredState(reader);
+				if (writer instanceof CheckpointedResultPartition) {
+					((CheckpointedResultPartition) writer).readRecoveredState(reader);
+				} else {
+					throw new IllegalStateException(
+							"Cannot restore state to a non-checkpointable partition type: " + writer);
+				}
 			}
 		}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 45650ae..809b432 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -33,7 +33,8 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -428,8 +429,9 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 	private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
 		ResultPartitionWriter[] writers = env.getAllWriters();
 		for (ResultPartitionWriter writer : writers) {
+			final CheckpointedResultPartition checkpointedPartition = checkCheckpointedResultPartition(writer);
 			for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
-				ResultSubpartition subpartition = writer.getSubpartition(i);
+				CheckpointedResultSubpartition subpartition = checkpointedPartition.getCheckpointedSubpartition(i);
 				channelStateWriter.addOutputData(
 					checkpointId,
 					subpartition.getSubpartitionInfo(),
@@ -448,6 +450,15 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			});
 	}
 
+	private static CheckpointedResultPartition checkCheckpointedResultPartition(ResultPartitionWriter partition) {
+		if (partition instanceof CheckpointedResultPartition) {
+			return (CheckpointedResultPartition) partition;
+		} else {
+			throw new IllegalStateException(
+					"Cannot take a checkpoint of a partition type that is not checkpointed: " + partition);
+		}
+	}
+
 	private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
 		// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
 		executorService.execute(new AsyncCheckpointRunnable(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4670691..5462c19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -1986,13 +1988,18 @@ public class StreamTaskTest extends TestLogger {
 		}
 	}
 
-	private static class RecoveryResultPartition extends MockResultPartitionWriter {
+	private static class RecoveryResultPartition extends MockResultPartitionWriter implements CheckpointedResultPartition {
 		private boolean isStateRecovered;
 
 		RecoveryResultPartition() {
 		}
 
 		@Override
+		public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+			return (CheckpointedResultSubpartition) super.getSubpartition(subpartitionIndex);
+		}
+
+		@Override
 		public void readRecoveredState(ChannelStateReader stateReader) {
 			isStateRecovered = true;
 		}