You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:14 UTC

[flink] 03/07: [FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 88a07a9eb65514cf1874a27afc72bd430d87d11f
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Mon Aug 31 15:30:20 2020 +0800

    [FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic
    
    In the current abstraction, buffers are written to and read from ResultSubpartitions, which is a hash-style data writing and reading implementation. This is in contrast to implementations where records are appended to a joint structure, from which the data is drawn after the write phase is finished, for example the sort-based partitioning which clusters data belonging to different channels by sorting channel index. In the future, sort-merge based ResultPartitionWriter will be implemen [...]
---
 .../io/network/metrics/ResultPartitionMetrics.java |  32 ++--
 .../partition/BoundedBlockingResultPartition.java  |   2 +-
 .../partition/BufferWritingResultPartition.java    | 176 +++++++++++++++++++++
 .../partition/PipelinedResultPartition.java        |   6 +-
 .../io/network/partition/ResultPartition.java      | 121 +++-----------
 ...oundedBlockingSubpartitionAvailabilityTest.java |   2 +-
 .../partition/FileChannelBoundedDataTest.java      |   2 +-
 .../network/partition/InputGateFairnessTest.java   |  12 +-
 .../partition/ResultPartitionFactoryTest.java      |   4 +-
 .../io/network/partition/ResultPartitionTest.java  |   2 +-
 .../partition/consumer/LocalInputChannelTest.java  |   5 +-
 11 files changed, 227 insertions(+), 137 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
index 2171ff3..e3ba640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.metrics;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,13 +48,7 @@ public class ResultPartitionMetrics {
 	 * @return total number of queued buffers
 	 */
 	long refreshAndGetTotal() {
-		long total = 0;
-
-		for (ResultSubpartition part : partition.getAllPartitions()) {
-			total += part.unsynchronizedGetNumberOfQueuedBuffers();
-		}
-
-		return total;
+		return partition.getNumberOfQueuedBuffers();
 	}
 
 	/**
@@ -66,15 +59,15 @@ public class ResultPartitionMetrics {
 	 */
 	int refreshAndGetMin() {
 		int min = Integer.MAX_VALUE;
+		int numSubpartitions = partition.getNumberOfSubpartitions();
 
-		ResultSubpartition[] allPartitions = partition.getAllPartitions();
-		if (allPartitions.length == 0) {
+		if (numSubpartitions == 0) {
 			// meaningful value when no channels exist:
 			return 0;
 		}
 
-		for (ResultSubpartition part : allPartitions) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+		for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+			int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
 			min = Math.min(min, size);
 		}
 
@@ -89,9 +82,10 @@ public class ResultPartitionMetrics {
 	 */
 	int refreshAndGetMax() {
 		int max = 0;
+		int numSubpartitions = partition.getNumberOfSubpartitions();
 
-		for (ResultSubpartition part : partition.getAllPartitions()) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+		for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+			int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
 			max = Math.max(max, size);
 		}
 
@@ -105,15 +99,7 @@ public class ResultPartitionMetrics {
 	 * @return average number of queued buffers per sub-partition
 	 */
 	float refreshAndGetAvg() {
-		long total = 0;
-
-		ResultSubpartition[] allPartitions = partition.getAllPartitions();
-		for (ResultSubpartition part : allPartitions) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
-			total += size;
-		}
-
-		return total / (float) allPartitions.length;
+		return partition.getNumberOfQueuedBuffers() / (float) partition.getNumberOfSubpartitions();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
index fc440a0..a16cfe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * per sub-partition. This implementation hence requires at least as many files (file handles) and
  * memory buffers as the parallelism of the target task that the data is shuffled to.
  */
-public class BoundedBlockingResultPartition extends ResultPartition {
+public class BoundedBlockingResultPartition extends BufferWritingResultPartition {
 
 	public BoundedBlockingResultPartition(
 			String owningTaskName,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
new file mode 100644
index 0000000..437d0a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link ResultPartition} which writes buffers directly to {@link ResultSubpartition}s. This
+ * is in contrast to implementations where records are written to a joint structure, from which
+ * the subpartitions draw the data after the write phase is finished, for example the sort-based
+ * partitioning.
+ *
+ * <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
+ * transported through the network.
+ */
+public class BufferWritingResultPartition extends ResultPartition {
+
+	/** The subpartitions of this partition. At least one. */
+	protected final ResultSubpartition[] subpartitions;
+
+	public BufferWritingResultPartition(
+		String owningTaskName,
+		int partitionIndex,
+		ResultPartitionID partitionId,
+		ResultPartitionType partitionType,
+		ResultSubpartition[] subpartitions,
+		int numTargetKeyGroups,
+		ResultPartitionManager partitionManager,
+		@Nullable BufferCompressor bufferCompressor,
+		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
+
+		super(
+			owningTaskName,
+			partitionIndex,
+			partitionId,
+			partitionType,
+			subpartitions.length,
+			numTargetKeyGroups,
+			partitionManager,
+			bufferCompressor,
+			bufferPoolFactory);
+
+		this.subpartitions = checkNotNull(subpartitions);
+	}
+
+	public int getNumberOfQueuedBuffers() {
+		int totalBuffers = 0;
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
+		}
+
+		return totalBuffers;
+	}
+
+	public int getNumberOfQueuedBuffers(int targetSubpartition) {
+		checkArgument(targetSubpartition >= 0 && targetSubpartition < numSubpartitions);
+		return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
+	}
+
+	@Override
+	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
+		checkInProduceState();
+
+		return bufferPool.requestBufferBuilderBlocking(targetChannel);
+	}
+
+	@Override
+	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
+		return bufferPool.requestBufferBuilder(targetChannel);
+	}
+
+	@Override
+	public boolean addBufferConsumer(
+			BufferConsumer bufferConsumer,
+			int subpartitionIndex) throws IOException {
+		checkNotNull(bufferConsumer);
+
+		ResultSubpartition subpartition;
+		try {
+			checkInProduceState();
+			subpartition = subpartitions[subpartitionIndex];
+		}
+		catch (Exception ex) {
+			bufferConsumer.close();
+			throw ex;
+		}
+
+		return subpartition.add(bufferConsumer);
+	}
+
+	@Override
+	public void flushAll() {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
+	}
+
+	@Override
+	public void flush(int targetSubpartition) {
+		subpartitions[targetSubpartition].flush();
+	}
+
+	@Override
+	public ResultSubpartitionView createSubpartitionView(
+			int subpartitionIndex,
+			BufferAvailabilityListener availabilityListener) throws IOException {
+		checkElementIndex(subpartitionIndex, numSubpartitions, "Subpartition not found.");
+		checkState(!isReleased(), "Partition released.");
+
+		ResultSubpartition subpartition = subpartitions[subpartitionIndex];
+		ResultSubpartitionView readView = subpartition.createReadView(availabilityListener);
+
+		LOG.debug("Created {}", readView);
+
+		return readView;
+	}
+
+	@Override
+	public void finish() throws IOException {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.finish();
+		}
+		super.finish();
+	}
+
+	@Override
+	protected void releaseInternal() {
+		// Release all subpartitions
+		for (ResultSubpartition subpartition : subpartitions) {
+			try {
+				subpartition.release();
+			}
+			// Catch this in order to ensure that release is called on all subpartitions
+			catch (Throwable t) {
+				LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
+			}
+		}
+	}
+
+	@VisibleForTesting
+	public ResultSubpartition[] getAllPartitions() {
+		return subpartitions;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index dc9da76..a1888c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
  * freed.
  */
-public class PipelinedResultPartition extends ResultPartition
+public class PipelinedResultPartition extends BufferWritingResultPartition
 		implements CheckpointedResultPartition, ChannelStateHolder {
 
 	/** The lock that guard release operations (which can be asynchronously propagated from the
@@ -134,12 +134,12 @@ public class PipelinedResultPartition extends ResultPartition
 
 	@Override
 	public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
-		return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
+		return (CheckpointedResultSubpartition) subpartitions[subpartitionIndex];
 	}
 
 	@Override
 	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
-		for (ResultSubpartition subPar : getAllPartitions()) {
+		for (ResultSubpartition subPar : subpartitions) {
 			((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 15430b9..02c91c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,9 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -43,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -52,8 +49,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
- * or more {@link ResultSubpartition} instances, which further partition the data depending on the
- * number of consuming tasks and the data {@link DistributionPattern}.
+ * or more {@link ResultSubpartition} instances or in a joint structure which further partition the
+ * data depending on the number of consuming tasks and the data {@link DistributionPattern}.
  *
  * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
@@ -84,18 +81,17 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	/** Type of this partition. Defines the concrete subpartition implementation to use. */
 	protected final ResultPartitionType partitionType;
 
-	/** The subpartitions of this partition. At least one. */
-	protected final ResultSubpartition[] subpartitions;
-
 	protected final ResultPartitionManager partitionManager;
 
+	protected final int numSubpartitions;
+
 	private final int numTargetKeyGroups;
 
 	// - Runtime state --------------------------------------------------------
 
 	private final AtomicBoolean isReleased = new AtomicBoolean();
 
-	private BufferPool bufferPool;
+	protected BufferPool bufferPool;
 
 	private boolean isFinished;
 
@@ -112,7 +108,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		int partitionIndex,
 		ResultPartitionID partitionId,
 		ResultPartitionType partitionType,
-		ResultSubpartition[] subpartitions,
+		int numSubpartitions,
 		int numTargetKeyGroups,
 		ResultPartitionManager partitionManager,
 		@Nullable BufferCompressor bufferCompressor,
@@ -123,7 +119,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		this.partitionIndex = partitionIndex;
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
-		this.subpartitions = checkNotNull(subpartitions);
+		this.numSubpartitions = numSubpartitions;
 		this.numTargetKeyGroups = numTargetKeyGroups;
 		this.partitionManager = checkNotNull(partitionManager);
 		this.bufferCompressor = bufferCompressor;
@@ -164,22 +160,22 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 
 	@Override
 	public int getNumberOfSubpartitions() {
-		return subpartitions.length;
+		return numSubpartitions;
 	}
 
 	public BufferPool getBufferPool() {
 		return bufferPool;
 	}
 
-	public int getNumberOfQueuedBuffers() {
-		int totalBuffers = 0;
-
-		for (ResultSubpartition subpartition : subpartitions) {
-			totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
-		}
+	/**
+	 * Returns the total number of queued buffers of all subpartitions.
+	 */
+	public abstract int getNumberOfQueuedBuffers();
 
-		return totalBuffers;
-	}
+	/**
+	 * Returns the number of queued buffers of the given target subpartition.
+	 */
+	public abstract int getNumberOfQueuedBuffers(int targetSubpartition);
 
 	/**
 	 * Returns the type of this result partition.
@@ -192,48 +188,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 
 	// ------------------------------------------------------------------------
 
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkInProduceState();
-
-		return bufferPool.requestBufferBuilderBlocking(targetChannel);
-	}
-
-	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetChannel);
-		return bufferBuilder;
-	}
-
-	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
-		checkNotNull(bufferConsumer);
-
-		ResultSubpartition subpartition;
-		try {
-			checkInProduceState();
-			subpartition = subpartitions[subpartitionIndex];
-		}
-		catch (Exception ex) {
-			bufferConsumer.close();
-			throw ex;
-		}
-
-		return subpartition.add(bufferConsumer);
-	}
-
-	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
-		}
-	}
-
-	@Override
-	public void flush(int subpartitionIndex) {
-		subpartitions[subpartitionIndex].flush();
-	}
-
 	/**
 	 * Finishes the result partition.
 	 *
@@ -245,10 +199,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	public void finish() throws IOException {
 		checkInProduceState();
 
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.finish();
-		}
-
 		isFinished = true;
 	}
 
@@ -268,19 +218,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 				this.cause = cause;
 			}
 
-			// Release all subpartitions
-			for (ResultSubpartition subpartition : subpartitions) {
-				try {
-					subpartition.release();
-				}
-				// Catch this in order to ensure that release is called on all subpartitions
-				catch (Throwable t) {
-					LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
-				}
-			}
+			releaseInternal();
 		}
 	}
 
+	/**
+	 * Releases all produced data including both those stored in memory and persisted on disk.
+	 */
+	protected abstract void releaseInternal();
+
 	@Override
 	public void close() {
 		if (bufferPool != null) {
@@ -293,21 +239,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		partitionManager.releasePartition(partitionId, throwable);
 	}
 
-	/**
-	 * Returns the requested subpartition.
-	 */
-	@Override
-	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
-		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
-		checkState(!isReleased.get(), "Partition released.");
-
-		ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
-
-		LOG.debug("Created {}", readView);
-
-		return readView;
-	}
-
 	public Throwable getFailureCause() {
 		return cause;
 	}
@@ -346,7 +277,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	@Override
 	public String toString() {
 		return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", "
-				+ subpartitions.length + " subpartitions]";
+				+ numSubpartitions + " subpartitions]";
 	}
 
 	// ------------------------------------------------------------------------
@@ -364,13 +295,9 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 				this, subpartitionIndex);
 	}
 
-	public ResultSubpartition[] getAllPartitions() {
-		return subpartitions;
-	}
-
 	// ------------------------------------------------------------------------
 
-	private void checkInProduceState() throws IllegalStateException {
+	protected void checkInProduceState() throws IllegalStateException {
 		checkState(!isFinished, "Partition already finished.");
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
index 8a81dc8..fd1188c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -122,7 +122,7 @@ public class BoundedBlockingSubpartitionAvailabilityTest {
 	// ------------------------------------------------------------------------
 
 	private static ResultSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
-		ResultPartition parent = new ResultPartitionBuilder()
+		BoundedBlockingResultPartition parent = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
 			.setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT)
 			.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
 			.setFileChannelManager(new FileChannelManagerImpl(new String[] { TMP_FOLDER.newFolder().toString() }, "data"))
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index aa209ec..559d13d 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -152,7 +152,7 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase {
 	}
 
 	private static ResultSubpartition createFileBoundedBlockingSubpartition() {
-		final ResultPartition resultPartition = new ResultPartitionBuilder()
+		final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferSize(BUFFER_SIZE)
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
 			.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index e770a65..b6fadf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -64,9 +64,9 @@ public class InputGateFairnessTest {
 		final int numberOfChannels = 37;
 		final int buffersPerChannel = 27;
 
-		ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
-			.mapToObj(i -> new ResultPartitionBuilder().build())
-			.toArray(ResultPartition[]::new);
+		PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+			.mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+			.toArray(PipelinedResultPartition[]::new);
 		final BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42);
 
 		// ----- create some source channels and fill them with buffers -----
@@ -124,9 +124,9 @@ public class InputGateFairnessTest {
 		final int numberOfChannels = 37;
 		final int buffersPerChannel = 27;
 
-		ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
-			.mapToObj(i -> new ResultPartitionBuilder().build())
-			.toArray(ResultPartition[]::new);
+		PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+			.mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+			.toArray(PipelinedResultPartition[]::new);
 		try (BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42)) {
 
 			// ----- create some source channels and fill them with one buffer each -----
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 61af900..903878b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -61,13 +61,13 @@ public class ResultPartitionFactoryTest extends TestLogger {
 
 	@Test
 	public void testBoundedBlockingSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
+		final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) createResultPartition(ResultPartitionType.BLOCKING);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
 	}
 
 	@Test
 	public void testPipelinedSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
+		final PipelinedResultPartition resultPartition = (PipelinedResultPartition) createResultPartition(ResultPartitionType.PIPELINED);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 7f419ba..e293422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -95,7 +95,7 @@ public class ResultPartitionTest {
 		final int numSubpartitions = 3;
 
 		for (int i = 0; i < numPartitions; i++) {
-			final ResultPartition partition = new ResultPartitionBuilder()
+			final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 				.setResultPartitionIndex(i)
 				.setNumberOfSubpartitions(numSubpartitions)
 				.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d0bb400..6e65ad74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -461,7 +462,7 @@ public class LocalInputChannelTest {
 	public void testCheckpointingInflightData() throws Exception {
 		SingleInputGate inputGate = new SingleInputGateBuilder().build();
 
-		ResultPartition parent = PartitionTestUtils.createPartition(
+		PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
 			ResultPartitionType.PIPELINED,
 			NoOpFileChannelManager.INSTANCE);
 		ResultSubpartition subpartition = parent.getAllPartitions()[0];
@@ -501,7 +502,7 @@ public class LocalInputChannelTest {
 
 	private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) throws IOException {
 		int bufferSize = 4096;
-		ResultPartition parent = PartitionTestUtils.createPartition(
+		PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
 			ResultPartitionType.PIPELINED,
 			NoOpFileChannelManager.INSTANCE,
 			true,