You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:23 UTC
[flink] 07/09: [FLINK-19047][network] Move unaligned checkpointing
methods to separate interfaces
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8250a01081e76a05f07e2f66d3adccffa811c30
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Aug 24 12:06:22 2020 +0200
[FLINK-19047][network] Move unaligned checkpointing methods to separate interfaces
---
.../network/api/writer/ResultPartitionWriter.java | 7 ----
.../partition/BoundedBlockingSubpartition.java | 6 ----
.../partition/CheckpointedResultPartition.java | 40 ++++++++++++++++++++++
.../partition/CheckpointedResultSubpartition.java | 38 ++++++++++++++++++++
.../partition/PipelinedResultPartition.java | 17 ++++++++-
.../network/partition/PipelinedSubpartition.java | 2 +-
.../io/network/partition/ResultPartition.java | 9 -----
.../io/network/partition/ResultSubpartition.java | 7 ----
...bleNotifyingResultPartitionWriterDecorator.java | 31 +++++++++++++----
.../io/network/api/writer/RecordWriterTest.java | 4 +--
.../partition/MockResultPartitionWriter.java | 5 ---
.../io/network/partition/ResultPartitionTest.java | 6 ++--
.../flink/streaming/runtime/tasks/StreamTask.java | 8 ++++-
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 15 ++++++--
.../streaming/runtime/tasks/StreamTaskTest.java | 9 ++++-
15 files changed, 153 insertions(+), 51 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 8cbb076..5d13dc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.api.writer;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -44,12 +43,6 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
*/
void setup() throws IOException;
- /**
- * Reads the previous output states with the given reader for unaligned checkpoint.
- * It should be done before task processing the inputs.
- */
- void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
-
ResultPartitionID getPartitionId();
int getNumberOfSubpartitions();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 767bd23..30e1924 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,7 +30,6 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -146,11 +145,6 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
}
}
- @Override
- public List<Buffer> requestInflightBufferSnapshot() {
- throw new UnsupportedOperationException("The batch job does not support unaligned checkpoint.");
- }
-
private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
try {
final Buffer buffer = bufferConsumer.build();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java
new file mode 100644
index 0000000..f3e2c2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultPartition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+
+import java.io.IOException;
+
+/**
+ * Interface for partitions that are checkpointed, meaning they store data as part of unaligned checkpoints.
+ */
+public interface CheckpointedResultPartition {
+
+ /**
+ * Gets the checkpointed subpartition with the given subpartitionIndex.
+ */
+ CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex);
+
+ /**
+ * Reads the previous output states with the given reader for unaligned checkpoint.
+ * It should be done before task processing the inputs.
+ */
+ void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java
new file mode 100644
index 0000000..bf54716
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/CheckpointedResultSubpartition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for subpartitions that are checkpointed, meaning they store data as part of unaligned checkpoints.
+ */
+public interface CheckpointedResultSubpartition {
+
+ ResultSubpartitionInfo getSubpartitionInfo();
+
+ List<Buffer> requestInflightBufferSnapshot();
+
+ void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index f61e826..142cebf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -45,7 +46,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
* freed.
*/
-public class PipelinedResultPartition extends ResultPartition {
+public class PipelinedResultPartition extends ResultPartition implements CheckpointedResultPartition {
/** The lock that guard release operations (which can be asynchronously propagated from the
* networks threads. */
@@ -121,6 +122,20 @@ public class PipelinedResultPartition extends ResultPartition {
}
@Override
+ public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+ return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
+ }
+
+ @Override
+ public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
+ for (ResultSubpartition subPar : getAllPartitions()) {
+ ((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
+ }
+
+ LOG.debug("{}: Finished reading recovered state.", this);
+ }
+
+ @Override
@SuppressWarnings("FieldAccessNotGuarded")
public String toString() {
return "PipelinedResultPartition " + partitionId.toString() + " [" + partitionType + ", "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index b99bf02..3463e44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -57,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
* {@link BufferConsumer} present in the queue.
*/
-public class PipelinedSubpartition extends ResultSubpartition {
+public class PipelinedSubpartition extends ResultSubpartition implements CheckpointedResultSubpartition {
private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 42e97a6..afe60f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -151,14 +150,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
partitionManager.registerResultPartition(this);
}
- @Override
- public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
- for (ResultSubpartition subpartition : subpartitions) {
- subpartition.readRecoveredState(stateReader);
- }
- LOG.debug("{}: Finished reading recovered state.", this);
- }
-
public String getOwningTaskName() {
return owningTaskName;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index c2016c6..71b4edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,13 +19,11 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import java.io.IOException;
-import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -77,9 +75,6 @@ public abstract class ResultSubpartition {
parent.onConsumedSubpartition(getSubPartitionIndex());
}
- public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
- }
-
/**
* Adds the given buffer.
*
@@ -117,8 +112,6 @@ public abstract class ResultSubpartition {
return add(bufferConsumer, false);
}
- public abstract List<Buffer> requestInflightBufferSnapshot();
-
public abstract void flush();
public abstract void finish() throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index c1a0ab4..e8d8816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
@@ -42,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* results, receivers are deployed as soon as the first buffer is added to the result partition.
* With blocking results on the other hand, receivers are deployed after the partition is finished.
*/
-public class ConsumableNotifyingResultPartitionWriterDecorator implements ResultPartitionWriter {
+public class ConsumableNotifyingResultPartitionWriterDecorator implements ResultPartitionWriter, CheckpointedResultPartition {
private final TaskActions taskActions;
@@ -101,11 +103,6 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
@Override
- public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
- partitionWriter.readRecoveredState(stateReader);
- }
-
- @Override
public boolean addBufferConsumer(
BufferConsumer bufferConsumer,
int subpartitionIndex,
@@ -165,6 +162,28 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
// ------------------------------------------------------------------------
+ // checkpointable methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+ return getCheckpointablePartition().getCheckpointedSubpartition(subpartitionIndex);
+ }
+
+ @Override
+ public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
+ getCheckpointablePartition().readRecoveredState(stateReader);
+ }
+
+ private CheckpointedResultPartition getCheckpointablePartition() {
+ if (partitionWriter instanceof CheckpointedResultPartition) {
+ return (CheckpointedResultPartition) partitionWriter;
+ } else {
+ throw new IllegalStateException("This partition is not checkpointable: " + partitionWriter);
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 1593cd6..0f87c4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -45,9 +45,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
@@ -457,7 +457,7 @@ public class RecordWriterTest {
final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
final ChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(totalStates, states);
- final ResultPartition partition = new ResultPartitionBuilder()
+ final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
.setNetworkBufferPool(globalPool)
.build();
final RecordWriter<IntValue> recordWriter = new RecordWriterBuilder<IntValue>().build(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index d6a7b50..518fed4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -40,10 +39,6 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
}
@Override
- public void readRecoveredState(ChannelStateReader stateReader) {
- }
-
- @Override
public ResultPartitionID getPartitionId() {
return partitionId;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 23e61c8..7f419ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -449,7 +449,7 @@ public class ResultPartitionTest {
public void testInitializeEmptyState() throws Exception {
final int totalBuffers = 2;
final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1);
- final ResultPartition partition = new ResultPartitionBuilder()
+ final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
.setNetworkBufferPool(globalPool)
.build();
final ChannelStateReader stateReader = ChannelStateReader.NO_OP;
@@ -481,7 +481,7 @@ public class ResultPartitionTest {
final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
final ChannelStateReader stateReader = new FiniteChannelStateReader(totalStates, states);
- final ResultPartition partition = new ResultPartitionBuilder()
+ final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
.setNetworkBufferPool(globalPool)
.build();
final ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -540,7 +540,7 @@ public class ResultPartitionTest {
public void testReadRecoveredStateWithException() throws Exception {
final int totalBuffers = 2;
final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1);
- final ResultPartition partition = new ResultPartitionBuilder()
+ final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
.setNetworkBufferPool(globalPool)
.build();
final ChannelStateReader stateReader = new ChannelStateReaderWithException();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 20c7557..c4b1be5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -494,7 +495,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
if (writers != null) {
for (ResultPartitionWriter writer : writers) {
- writer.readRecoveredState(reader);
+ if (writer instanceof CheckpointedResultPartition) {
+ ((CheckpointedResultPartition) writer).readRecoveredState(reader);
+ } else {
+ throw new IllegalStateException(
+ "Cannot restore state to a non-checkpointable partition type: " + writer);
+ }
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 45650ae..809b432 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -33,7 +33,8 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -428,8 +429,9 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
ResultPartitionWriter[] writers = env.getAllWriters();
for (ResultPartitionWriter writer : writers) {
+ final CheckpointedResultPartition checkpointedPartition = checkCheckpointedResultPartition(writer);
for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
- ResultSubpartition subpartition = writer.getSubpartition(i);
+ CheckpointedResultSubpartition subpartition = checkpointedPartition.getCheckpointedSubpartition(i);
channelStateWriter.addOutputData(
checkpointId,
subpartition.getSubpartitionInfo(),
@@ -448,6 +450,15 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
});
}
+ private static CheckpointedResultPartition checkCheckpointedResultPartition(ResultPartitionWriter partition) {
+ if (partition instanceof CheckpointedResultPartition) {
+ return (CheckpointedResultPartition) partition;
+ } else {
+ throw new IllegalStateException(
+ "Cannot take a checkpoint of a partition type that is not checkpointed: " + partition);
+ }
+ }
+
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
executorService.execute(new AsyncCheckpointRunnable(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4670691..5462c19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
+import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -1986,13 +1988,18 @@ public class StreamTaskTest extends TestLogger {
}
}
- private static class RecoveryResultPartition extends MockResultPartitionWriter {
+ private static class RecoveryResultPartition extends MockResultPartitionWriter implements CheckpointedResultPartition {
private boolean isStateRecovered;
RecoveryResultPartition() {
}
@Override
+ public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
+ return (CheckpointedResultSubpartition) super.getSubpartition(subpartitionIndex);
+ }
+
+ @Override
public void readRecoveredState(ChannelStateReader stateReader) {
isStateRecovered = true;
}