You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/29 10:23:45 UTC

[flink] branch release-1.9 updated (5077f18 -> 7116ab7)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 5077f18  [FLINK-13387][WebUI] Fix log download for old UI
     new 526236a  [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request
     new f738c4e  [FLINK-13245][network] Make subpartition consumption notification independant
     new 7116ab7  [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/netty/PartitionRequestQueue.java    | 43 +++-------
 .../ReleaseOnConsumptionResultPartition.java       | 42 +++++++---
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++++++++++++++++++++++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 +++++++++-
 6 files changed, 182 insertions(+), 48 deletions(-)


[flink] 02/03: [FLINK-13245][network] Make subpartition consumption notification independant

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f738c4edd5858891236a41fc1e453b06cf815089
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jul 26 11:25:40 2019 +0200

    [FLINK-13245][network] Make subpartition consumption notification independant
---
 .../ReleaseOnConsumptionResultPartition.java       | 42 +++++++++++++++-------
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 ++++++++++++++++++---
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 19ec681..766f500 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -31,11 +30,18 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 
+	private static final Object lock = new Object();
+
+	/**
+	 * A flag for each subpartition indicating whether it was already consumed or not.
+	 */
+	private final boolean[] consumedSubpartitions;
+
 	/**
 	 * The total number of references to subpartitions of this result. The result partition can be
 	 * safely released, iff the reference count is zero.
 	 */
-	private final AtomicInteger pendingReferences = new AtomicInteger();
+	private int numUnconsumedSubpartitions;
 
 	ReleaseOnConsumptionResultPartition(
 			String owningTaskName,
@@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 			FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 		super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
 
-		pendingReferences.set(subpartitions.length);
+		this.consumedSubpartitions = new boolean[subpartitions.length];
+		this.numUnconsumedSubpartitions = subpartitions.length;
 	}
 
 	@Override
 	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
-		checkState(pendingReferences.get() > 0, "Partition not pinned.");
+		checkState(numUnconsumedSubpartitions > 0, "Partition not pinned.");
 
 		return super.createSubpartitionView(index, availabilityListener);
 	}
@@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 			return;
 		}
 
-		int refCnt = pendingReferences.decrementAndGet();
+		final int remainingUnconsumed;
 
-		if (refCnt == 0) {
-			partitionManager.onConsumedPartition(this);
-		} else if (refCnt < 0) {
-			throw new IllegalStateException("All references released.");
+		// we synchronize only the bookkeeping section, to avoid holding the lock during any
+		// calls into other components
+		synchronized (lock) {
+			if (consumedSubpartitions[subpartitionIndex]) {
+				// repeated call - ignore
+				return;
+			}
+
+			consumedSubpartitions[subpartitionIndex] = true;
+			remainingUnconsumed = (--numUnconsumedSubpartitions);
 		}
 
-		LOG.debug("{}: Received release notification for subpartition {}.",
-			this, subpartitionIndex);
+		LOG.debug("{}: Received consumed notification for subpartition {}.", this, subpartitionIndex);
+
+		if (remainingUnconsumed == 0) {
+			partitionManager.onConsumedPartition(this);
+		} else if (remainingUnconsumed < 0) {
+			throw new IllegalStateException("Received consume notification even though all subpartitions are already consumed.");
+		}
 	}
 
 	@Override
 	public String toString() {
 		return "ReleaseOnConsumptionResultPartition " + partitionId.toString() + " [" + partitionType + ", "
 			+ subpartitions.length + " subpartitions, "
-			+ pendingReferences + " pending references]";
+			+ numUnconsumedSubpartitions + " pending consumptions]";
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
index afcd095..ce7467f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link ReleaseOnConsumptionResultPartitionTest}.
@@ -41,9 +41,42 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
 		manager.registerResultPartition(partition);
 
 		partition.onConsumedSubpartition(0);
-		assertThat(partition.isReleased(), is(false));
+		assertFalse(partition.isReleased());
 
 		partition.onConsumedSubpartition(1);
-		assertThat(partition.isReleased(), is(true));
+		assertTrue(partition.isReleased());
+	}
+
+	@Test
+	public void testMultipleReleaseCallsAreIdempotent() {
+		final ResultPartitionManager manager = new ResultPartitionManager();
+		final ResultPartition partition = new ResultPartitionBuilder()
+			.setNumberOfSubpartitions(2)
+			.isReleasedOnConsumption(true)
+			.setResultPartitionManager(manager)
+			.build();
+		manager.registerResultPartition(partition);
+
+		partition.onConsumedSubpartition(0);
+		partition.onConsumedSubpartition(0);
+
+		assertFalse(partition.isReleased());
+	}
+
+	@Test
+	public void testReleaseAfterIdempotentCalls() {
+		final ResultPartitionManager manager = new ResultPartitionManager();
+		final ResultPartition partition = new ResultPartitionBuilder()
+			.setNumberOfSubpartitions(2)
+			.isReleasedOnConsumption(true)
+			.setResultPartitionManager(manager)
+			.build();
+		manager.registerResultPartition(partition);
+
+		partition.onConsumedSubpartition(0);
+		partition.onConsumedSubpartition(0);
+		partition.onConsumedSubpartition(1);
+
+		assertTrue(partition.isReleased());
 	}
 }


[flink] 03/03: [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7116ab71edc183d34d128453e06a3efc15ad8905
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jul 26 11:50:55 2019 +0200

    [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs
---
 .../runtime/io/network/netty/PartitionRequestQueue.java  | 16 ----------------
 1 file changed, 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	/** All the readers created for the consumers' partition requests. */
 	private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>();
 
-	private final Set<InputChannelID> released = Sets.newHashSet();
-
 	private boolean fatalError;
 
 	private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		} else if (msg.getClass() == InputChannelID.class) {
 			// Release partition view that get a cancel request.
 			InputChannelID toCancel = (InputChannelID) msg;
-			if (released.contains(toCancel)) {
-				return;
-			}
 
 			// remove reader from queue of available readers
 			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 					if (!reader.isReleased()) {
 						continue;
 					}
-					markAsReleased(reader.getReceiverId());
 
 					Throwable cause = reader.getFailureCause();
 					if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		reader.notifySubpartitionConsumed();
 		reader.setRegisteredAsAvailable(false);
 		reader.releaseAllResources();
-		markAsReleased(reader.getReceiverId());
-	}
-
-	/**
-	 * Marks a receiver as released.
-	 */
-	private void markAsReleased(InputChannelID receiverId) {
-		released.add(receiverId);
 	}
 
 	// This listener is called after an element of the current nonEmptyReader has been


[flink] 01/03: [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 526236a4d537e78dbac4c575611ef52b602923d9
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jul 16 23:00:09 2019 +0800

    [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request
    
    On producer side the netty handler receives the CancelPartitionRequest for releasing the SubpartitionView resource.
    In previous implementation we try to find the corresponding view via available queue in PartitionRequestQueue. But
    in reality the view is not always available to stay in this queue, then the view would never be released.
    
    Furthermore the release of ResultPartition/ResultSubpartitions is based on the reference counter in ReleaseOnConsumptionResultPartition,
    but while handling the CancelPartitionRequest in PartitionRequestQueue, the ReleaseOnConsumptionResultPartition is never
    notified of consumed subpartition. That means the reference counter would never decrease to 0 to trigger partition release,
    which would bring file resource leak in the case of BoundedBlockingSubpartition.
    
    In order to fix above two issues, the corresponding view is released via all reader queue instead, and then it would call
    ReleaseOnConsumptionResultPartition#onConsumedSubpartition meanwhile to solve this bug.
---
 .../io/network/netty/PartitionRequestQueue.java    | 33 ++++----
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++++++++++++++++++++++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index f82a42f..b492ea6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -138,9 +138,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		}
 
 		for (NetworkSequenceViewReader reader : allReaders.values()) {
-			reader.notifySubpartitionConsumed();
-			reader.releaseAllResources();
-			markAsReleased(reader.getReceiverId());
+			releaseViewReader(reader);
 		}
 		allReaders.clear();
 	}
@@ -181,19 +179,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 				return;
 			}
 
-			// Cancel the request for the input channel
-			int size = availableReaders.size();
-			for (int i = 0; i < size; i++) {
-				NetworkSequenceViewReader reader = pollAvailableReader();
-				if (reader.getReceiverId().equals(toCancel)) {
-					reader.releaseAllResources();
-					markAsReleased(reader.getReceiverId());
-				} else {
-					registerAvailableReader(reader);
-				}
-			}
+			// remove reader from queue of available readers
+			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
 
-			allReaders.remove(toCancel);
+			// remove reader from queue of all readers and release its resource
+			final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel);
+			if (toRelease != null) {
+				releaseViewReader(toRelease);
+			}
 		} else {
 			ctx.fireUserEventTriggered(msg);
 		}
@@ -308,14 +301,20 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	private void releaseAllResources() throws IOException {
 		// note: this is only ever executed by one thread: the Netty IO thread!
 		for (NetworkSequenceViewReader reader : allReaders.values()) {
-			reader.releaseAllResources();
-			markAsReleased(reader.getReceiverId());
+			releaseViewReader(reader);
 		}
 
 		availableReaders.clear();
 		allReaders.clear();
 	}
 
+	private void releaseViewReader(NetworkSequenceViewReader reader) throws IOException {
+		reader.notifySubpartitionConsumed();
+		reader.setRegisteredAsAvailable(false);
+		reader.releaseAllResources();
+		markAsReleased(reader.getReceiverId());
+	}
+
 	/**
 	 * Marks a receiver as released.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index eca8263..58c02df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -109,7 +109,7 @@ public class CancelPartitionRequestTest {
 			}
 
 			verify(view, times(1)).releaseAllResources();
-			verify(view, times(0)).notifySubpartitionConsumed();
+			verify(view, times(1)).notifySubpartitionConsumed();
 		}
 		finally {
 			shutdown(serverAndClient);
@@ -168,7 +168,7 @@ public class CancelPartitionRequestTest {
 			NettyTestUtil.awaitClose(ch);
 
 			verify(view, times(1)).releaseAllResources();
-			verify(view, times(0)).notifySubpartitionConsumed();
+			verify(view, times(1)).notifySubpartitionConsumed();
 		}
 		finally {
 			shutdown(serverAndClient);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index a9e8662..a0dbd7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -19,10 +19,18 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -32,7 +40,11 @@ import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
@@ -54,6 +66,24 @@ import static org.junit.Assert.assertTrue;
  */
 public class PartitionRequestQueueTest {
 
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final int BUFFER_SIZE = 1024 * 1024;
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		fileChannelManager = new FileChannelManagerImpl(
+			new String[] {TEMPORARY_FOLDER.newFolder().getAbsolutePath()}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable,
 	 * on channelWritability change event should result in reading all of the messages.
@@ -348,6 +378,68 @@ public class PartitionRequestQueueTest {
 		assertNull(channel.readOutbound());
 	}
 
+	@Test
+	public void testCancelPartitionRequestForUnavailableView() throws Exception {
+		testCancelPartitionRequest(false);
+	}
+
+	@Test
+	public void testCancelPartitionRequestForAvailableView() throws Exception {
+		testCancelPartitionRequest(true);
+	}
+
+	private void testCancelPartitionRequest(boolean isAvailableView) throws Exception {
+		// setup
+		final ResultPartitionManager partitionManager = new ResultPartitionManager();
+		final ResultPartition partition = createFinishedPartitionWithFilledData(partitionManager);
+		final InputChannelID receiverId = new InputChannelID();
+		final PartitionRequestQueue queue = new PartitionRequestQueue();
+		final CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+		final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+		reader.requestSubpartitionView(partitionManager, partition.getPartitionId(), 0);
+		// add this reader into allReaders queue
+		queue.notifyReaderCreated(reader);
+
+		// block the channel so that we see an intermediate state in the test
+		blockChannel(channel);
+
+		// add credit to make this reader available for adding into availableReaders queue
+		if (isAvailableView) {
+			queue.addCredit(receiverId, 1);
+			assertTrue(queue.getAvailableReaders().contains(reader));
+		}
+
+		// cancel this subpartition view
+		queue.cancel(receiverId);
+		channel.runPendingTasks();
+
+		assertFalse(queue.getAvailableReaders().contains(reader));
+		// the partition and its reader view should all be released
+		assertTrue(reader.isReleased());
+		assertTrue(partition.isReleased());
+		for (ResultSubpartition subpartition : partition.getAllPartitions()) {
+			assertTrue(subpartition.isReleased());
+		}
+
+		// cleanup
+		channel.close();
+	}
+
+	private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
+		final ResultPartition partition = new ResultPartitionBuilder()
+			.setResultPartitionType(ResultPartitionType.BLOCKING)
+			.setFileChannelManager(fileChannelManager)
+			.setResultPartitionManager(partitionManager)
+			.isReleasedOnConsumption(true)
+			.build();
+
+		partitionManager.registerResultPartition(partition);
+		PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+
+		return partition;
+	}
+
 	/**
 	 * Blocks the given channel by adding a buffer that is bigger than the high watermark.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5e39a43..5f80421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -158,4 +159,11 @@ public class PartitionTestUtils {
 			true,
 			releaseType);
 	}
+
+	public static void writeBuffers(ResultPartition partition, int numberOfBuffers, int bufferSize) throws IOException {
+		for (int i = 0; i < numberOfBuffers; i++) {
+			partition.addBufferConsumer(createFilledBufferConsumer(bufferSize, bufferSize), 0);
+		}
+		partition.finish();
+	}
 }