You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:32 UTC

[flink] 07/10: [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2375693f1cc80118bf61922f027942a0f101c486
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 19:01:31 2019 +0200

    [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments
---
 .../flink/core/memory/MemorySegmentProvider.java   | 31 +++++++++++
 .../runtime/io/network/NetworkEnvironment.java     |  5 +-
 .../io/network/buffer/NetworkBufferPool.java       | 35 +++++++-----
 .../partition/consumer/RemoteInputChannel.java     |  5 +-
 .../partition/consumer/SingleInputGate.java        | 29 ++++------
 .../runtime/io/network/NetworkEnvironmentTest.java | 16 +++---
 .../io/network/buffer/BufferPoolFactoryTest.java   | 12 ++--
 .../network/buffer/LocalBufferPoolDestroyTest.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java     |  2 +-
 .../io/network/buffer/NetworkBufferPoolTest.java   | 65 ++++++++++------------
 ...editBasedPartitionRequestClientHandlerTest.java | 15 ++---
 .../netty/PartitionRequestClientHandlerTest.java   |  5 +-
 .../network/netty/PartitionRequestClientTest.java  | 12 ++--
 .../partition/consumer/LocalInputChannelTest.java  |  2 +-
 .../partition/consumer/RemoteInputChannelTest.java | 40 ++++++-------
 .../BackPressureStatsTrackerImplITCase.java        |  2 +-
 .../runtime/io/BarrierBufferMassiveRandomTest.java |  4 +-
 17 files changed, 150 insertions(+), 132 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
new file mode 100644
index 0000000..83b2d19
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The provider used for requesting and releasing batch of memory segments.
+ */
+public interface MemorySegmentProvider {
+	Collection<MemorySegment> requestMemorySegments() throws IOException;
+
+	void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d0e8263..97016ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -129,7 +129,8 @@ public class NetworkEnvironment {
 
 		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
 			config.numNetworkBuffers(),
-			config.networkBufferSize());
+			config.networkBufferSize(),
+			config.networkBuffersPerChannel());
 
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
@@ -246,7 +247,7 @@ public class NetworkEnvironment {
 					config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
 
 				// assign exclusive buffers to input channels directly and use the rest for floating buffers
-				gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel());
+				gate.assignExclusiveSegments(networkBufferPool);
 				bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
 			} else {
 				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 48ce27e..3d6c2da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.MathUtils;
 
@@ -31,6 +32,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -49,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * the buffers for the network data transfer. When new local buffer pools are created, the
  * NetworkBufferPool dynamically redistributes the buffers between the pools.
  */
-public class NetworkBufferPool implements BufferPoolFactory {
+public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvider {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetworkBufferPool.class);
 
@@ -69,14 +71,19 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	private int numTotalRequiredBuffers;
 
+	private final int numberOfSegmentsToRequest;
+
 	/**
 	 * Allocates all {@link MemorySegment} instances managed by this pool.
 	 */
-	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
+	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest) {
 
 		this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
 		this.memorySegmentSize = segmentSize;
 
+		checkArgument(numberOfSegmentsToRequest > 0, "The number of required buffers should be larger than 0.");
+		this.numberOfSegmentsToRequest = numberOfSegmentsToRequest;
+
 		final long sizeInLong = (long) segmentSize;
 
 		try {
@@ -126,20 +133,19 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		availableMemorySegments.add(checkNotNull(segment));
 	}
 
-	public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {
-		checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0.");
-
+	@Override
+	public List<MemorySegment> requestMemorySegments() throws IOException {
 		synchronized (factoryLock) {
 			if (isDestroyed) {
 				throw new IllegalStateException("Network buffer pool has already been destroyed.");
 			}
 
-			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
+			if (numTotalRequiredBuffers + numberOfSegmentsToRequest > totalNumberOfMemorySegments) {
 				throw new IOException(String.format("Insufficient number of network buffers: " +
 								"required %d, but only %d available. The total number of network " +
 								"buffers is currently set to %d of %d bytes each. You can increase this " +
 								"number by setting the configuration keys '%s', '%s', and '%s'.",
-						numRequiredBuffers,
+					numberOfSegmentsToRequest,
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
 						memorySegmentSize,
@@ -148,12 +154,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
 			}
 
-			this.numTotalRequiredBuffers += numRequiredBuffers;
+			this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
 
 			try {
 				redistributeBuffers();
 			} catch (Throwable t) {
-				this.numTotalRequiredBuffers -= numRequiredBuffers;
+				this.numTotalRequiredBuffers -= numberOfSegmentsToRequest;
 
 				try {
 					redistributeBuffers();
@@ -164,9 +170,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			}
 		}
 
-		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
+		final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);
 		try {
-			while (segments.size() < numRequiredBuffers) {
+			while (segments.size() < numberOfSegmentsToRequest) {
 				if (isDestroyed) {
 					throw new IllegalStateException("Buffer pool is destroyed.");
 				}
@@ -178,7 +184,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			}
 		} catch (Throwable e) {
 			try {
-				recycleMemorySegments(segments, numRequiredBuffers);
+				recycleMemorySegments(segments, numberOfSegmentsToRequest);
 			} catch (IOException inner) {
 				e.addSuppressed(inner);
 			}
@@ -188,11 +194,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		return segments;
 	}
 
-	public void recycleMemorySegments(List<MemorySegment> segments) throws IOException {
+	@Override
+	public void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException {
 		recycleMemorySegments(segments, segments.size());
 	}
 
-	private void recycleMemorySegments(List<MemorySegment> segments, int size) throws IOException {
+	private void recycleMemorySegments(Collection<MemorySegment> segments, int size) throws IOException {
 		synchronized (factoryLock) {
 			numTotalRequiredBuffers -= size;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 30246c0..98182c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -40,6 +40,7 @@ import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -122,12 +123,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 	 * Assigns exclusive buffers to this input channel, and this method should be called only once
 	 * after this input channel is created.
 	 */
-	void assignExclusiveSegments(List<MemorySegment> segments) {
+	void assignExclusiveSegments(Collection<MemorySegment> segments) {
 		checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have " +
 			"already been set for this input channel.");
 
 		checkNotNull(segments);
-		checkArgument(segments.size() > 0, "The number of exclusive buffers per channel should be larger than 0.");
+		checkArgument(!segments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0.");
 
 		this.initialCredit = segments.size();
 		this.numRequiredBuffers = segments.size();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 406416b..6b8cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -30,7 +31,6 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
@@ -155,8 +155,8 @@ public class SingleInputGate extends InputGate {
 	 */
 	private BufferPool bufferPool;
 
-	/** Global network buffer pool to request and recycle exclusive buffers (only for credit-based). */
-	private NetworkBufferPool networkBufferPool;
+	/** Global memory segment provider to request and recycle exclusive buffers (only for credit-based). */
+	private MemorySegmentProvider memorySegmentProvider;
 
 	private final boolean isCreditBased;
 
@@ -172,9 +172,6 @@ public class SingleInputGate extends InputGate {
 
 	private int numberOfUninitializedChannels;
 
-	/** Number of network buffers to use for each remote input channel. */
-	private int networkBuffersPerChannel;
-
 	/** A timer to retrigger local partition requests. Only initialized if actually needed. */
 	private Timer retriggerLocalRequestTimer;
 
@@ -293,22 +290,20 @@ public class SingleInputGate extends InputGate {
 	/**
 	 * Assign the exclusive buffers to all remote input channels directly for credit-based mode.
 	 *
-	 * @param networkBufferPool The global pool to request and recycle exclusive buffers
-	 * @param networkBuffersPerChannel The number of exclusive buffers for each channel
+	 * @param memorySegmentProvider The global memory segment provider to request and recycle exclusive buffers
 	 */
-	public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException {
+	public void assignExclusiveSegments(MemorySegmentProvider memorySegmentProvider) throws IOException {
 		checkState(this.isCreditBased, "Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
-		checkState(this.networkBufferPool == null, "Bug in input gate setup logic: global buffer pool has" +
-			"already been set for this input gate.");
+		checkState(this.memorySegmentProvider == null,
+			"Bug in input gate setup logic: global memory segment provider has already been set for this input gate.");
 
-		this.networkBufferPool = checkNotNull(networkBufferPool);
-		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.memorySegmentProvider = checkNotNull(memorySegmentProvider);
 
 		synchronized (requestLock) {
 			for (InputChannel inputChannel : inputChannels.values()) {
 				if (inputChannel instanceof RemoteInputChannel) {
 					((RemoteInputChannel) inputChannel).assignExclusiveSegments(
-						networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+						memorySegmentProvider.requestMemorySegments());
 				}
 			}
 		}
@@ -320,7 +315,7 @@ public class SingleInputGate extends InputGate {
 	 * @param segments The exclusive segments need to be recycled
 	 */
 	public void returnExclusiveSegments(List<MemorySegment> segments) throws IOException {
-		networkBufferPool.recycleMemorySegments(segments);
+		memorySegmentProvider.recycleMemorySegments(segments);
 	}
 
 	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
@@ -359,10 +354,10 @@ public class SingleInputGate extends InputGate {
 					newChannel = unknownChannel.toRemoteInputChannel(partitionLocation.getConnectionId());
 
 					if (this.isCreditBased) {
-						checkState(this.networkBufferPool != null, "Bug in input gate setup logic: " +
+						checkState(this.memorySegmentProvider != null, "Bug in input gate setup logic: " +
 							"global buffer pool has not been set for this input gate.");
 						((RemoteInputChannel) newChannel).assignExclusiveSegments(
-							networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+							memorySegmentProvider.requestMemorySegments());
 					}
 				}
 				else {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 81cd8f4..c06bfa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -117,10 +117,10 @@ public class NetworkEnvironmentTest {
 		assertEquals(enableCreditBasedFlowControl ? 8 : 8 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		int invokations = enableCreditBasedFlowControl ? 1 : 0;
-		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
 
 		for (ResultPartition rp : resultPartitions) {
 			rp.release();
@@ -243,10 +243,10 @@ public class NetworkEnvironmentTest {
 		assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		int invokations = enableCreditBasedFlowControl ? 1 : 0;
-		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
 
 		for (ResultPartition rp : resultPartitions) {
 			rp.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 14eb6cd..bb6a645 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -54,7 +54,7 @@ public class BufferPoolFactoryTest {
 
 	@Before
 	public void setupNetworkBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, 1);
 	}
 
 	@After
@@ -244,7 +244,7 @@ public class BufferPoolFactoryTest {
 
 	@Test
 	public void testUniformDistributionBounded3() throws IOException {
-		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, 1);
 		try {
 			BufferPool first = globalPool.createBufferPool(0, 10);
 			assertEquals(3, first.getNumBuffers());
@@ -277,12 +277,12 @@ public class BufferPoolFactoryTest {
 	 */
 	@Test
 	public void testUniformDistributionBounded4() throws IOException {
-		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 2);
 		try {
 			BufferPool first = globalPool.createBufferPool(0, 10);
 			assertEquals(10, first.getNumBuffers());
 
-			List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
+			List<MemorySegment> segmentList1 = globalPool.requestMemorySegments();
 			assertEquals(2, segmentList1.size());
 			assertEquals(8, first.getNumBuffers());
 
@@ -290,12 +290,12 @@ public class BufferPoolFactoryTest {
 			assertEquals(4, first.getNumBuffers());
 			assertEquals(4, second.getNumBuffers());
 
-			List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
+			List<MemorySegment> segmentList2 = globalPool.requestMemorySegments();
 			assertEquals(2, segmentList2.size());
 			assertEquals(3, first.getNumBuffers());
 			assertEquals(3, second.getNumBuffers());
 
-			List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
+			List<MemorySegment> segmentList3 = globalPool.requestMemorySegments();
 			assertEquals(2, segmentList3.size());
 			assertEquals(2, first.getNumBuffers());
 			assertEquals(2, second.getNumBuffers());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index 5e8e42e..116884c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -48,7 +48,7 @@ public class LocalBufferPoolDestroyTest {
 		LocalBufferPool localBufferPool = null;
 
 		try {
-			networkBufferPool = new NetworkBufferPool(1, 4096);
+			networkBufferPool = new NetworkBufferPool(1, 4096, 1);
 			localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
 			// Drain buffer pool
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index a0e10d7..2b7fa7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -67,7 +67,7 @@ public class LocalBufferPoolTest extends TestLogger {
 
 	@Before
 	public void setupLocalBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, 1);
 		localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 4e8eec6..89f673c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -60,7 +60,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 			final int bufferSize = 128;
 			final int numBuffers = 10;
 
-			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, 1);
 			assertEquals(bufferSize, globalPool.getMemorySegmentSize());
 			assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
 			assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
@@ -104,7 +104,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 1);
 
 			BufferPool fixedPool = globalPool.createBufferPool(2, 2);
 			BufferPool boundedPool = globalPool.createBufferPool(0, 1);
@@ -192,18 +192,18 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()} with the {@link NetworkBufferPool}
 	 * currently containing the number of required free segments.
 	 */
 	@Test
 	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, numBuffers / 2);
 
 		List<MemorySegment> memorySegments = Collections.emptyList();
 		try {
-			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
+			memorySegments = globalPool.requestMemorySegments();
 			assertEquals(memorySegments.size(), numBuffers / 2);
 
 			globalPool.recycleMemorySegments(memorySegments);
@@ -216,17 +216,17 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()} with the number of required
 	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
 	 */
 	@Test
 	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, numBuffers + 1);
 
 		try {
-			globalPool.requestMemorySegments(numBuffers + 1);
+			globalPool.requestMemorySegments();
 			fail("Should throw an IOException");
 		} catch (IOException e) {
 			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
@@ -236,35 +236,26 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
+	 * Tests {@link NetworkBufferPool} constructor with the invalid argument to
 	 * cause exception.
 	 */
-	@Test
-	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
-		final int numBuffers = 10;
-
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-
-		try {
-			// the number of requested buffers should be larger than zero
-			globalPool.requestMemorySegments(0);
-			fail("Should throw an IllegalArgumentException");
-		} catch (IllegalArgumentException e) {
-			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
-		} finally {
-			globalPool.destroy();
-		}
+	@Test(expected = IllegalArgumentException.class)
+	public void testRequestMemorySegmentsWithInvalidArgument() {
+		// the number of requested buffers should be larger than zero
+		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 0);
+		globalPool.destroy();
+		fail("Should throw an IllegalArgumentException");
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()} with the {@link NetworkBufferPool}
 	 * currently not containing the number of required free segments (currently occupied by a buffer pool).
 	 */
 	@Test
 	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
 		final int numBuffers = 10;
 
-		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, numBuffers / 2);
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
 		List<MemorySegment> memorySegments = Collections.emptyList();
@@ -298,7 +289,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
 			// take more buffers than are freely available at the moment via requestMemorySegments()
 			isRunning.await();
-			memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
+			memorySegments = networkBufferPool.requestMemorySegments();
 			assertThat(memorySegments, not(hasItem(nullValue())));
 		} finally {
 			if (bufferRecycler != null) {
@@ -313,14 +304,14 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with an exception occurring during
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()} with an exception occurring during
 	 * the call to {@link NetworkBufferPool#redistributeBuffers()}.
 	 */
 	@Test
 	public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
 		final int numBuffers = 3;
 
-		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, 2);
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
 		List<MemorySegment> memorySegments = Collections.emptyList();
@@ -339,7 +330,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 			}
 
 			// this will ask the buffer pool to release its excess buffers which should fail
-			memorySegments = networkBufferPool.requestMemorySegments(2);
+			memorySegments = networkBufferPool.requestMemorySegments();
 			fail("Requesting memory segments should have thrown during buffer pool redistribution.");
 		} catch (TestIOException e) {
 			// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
@@ -361,7 +352,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 	@Test
 	public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
 		final int numBuffers = 3;
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, 1);
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
 		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers,
@@ -406,14 +397,14 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()}, verifying it may be aborted in
 	 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 	 */
 	@Test
 	public void testRequestMemorySegmentsInterruptable() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, 10);
 		MemorySegment segment = globalPool.requestMemorySegment();
 		assertNotNull(segment);
 
@@ -422,7 +413,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 			@Override
 			public void go() throws Exception {
 				isRunning.trigger();
-				globalPool.requestMemorySegments(10);
+				globalPool.requestMemorySegments();
 			}
 		};
 		asyncRequest.start();
@@ -445,14 +436,14 @@ public class NetworkBufferPoolTest extends TestLogger {
 	}
 
 	/**
-	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
+	 * Tests {@link NetworkBufferPool#requestMemorySegments()}, verifying it may be aborted and
 	 * remains in a defined state even if the waiting is interrupted.
 	 */
 	@Test
 	public void testRequestMemorySegmentsInterruptable2() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, 10);
 		MemorySegment segment = globalPool.requestMemorySegment();
 		assertNotNull(segment);
 
@@ -461,7 +452,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 			@Override
 			public void go() throws Exception {
 				isRunning.trigger();
-				globalPool.requestMemorySegments(10);
+				globalPool.requestMemorySegments();
 			}
 		};
 		asyncRequest.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 8cdf221..f2808fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -136,14 +136,13 @@ public class CreditBasedPartitionRequestClientHandlerTest {
 	 */
 	@Test
 	public void testReceiveBuffer() throws Exception {
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 
 			final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
 			handler.addInputChannel(inputChannel);
@@ -245,15 +244,14 @@ public class CreditBasedPartitionRequestClientHandlerTest {
 		final PartitionRequestClient client = new PartitionRequestClient(
 			channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
 
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel1 = createRemoteInputChannel(inputGate, client);
 		final RemoteInputChannel inputChannel2 = createRemoteInputChannel(inputGate, client);
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 
 			inputChannel1.requestSubpartition(0);
 			inputChannel2.requestSubpartition(0);
@@ -346,14 +344,13 @@ public class CreditBasedPartitionRequestClientHandlerTest {
 		final PartitionRequestClient client = new PartitionRequestClient(
 			channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
 
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 
 			inputChannel.requestSubpartition(0);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 16a2415..9017bf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -127,14 +127,13 @@ public class PartitionRequestClientHandlerTest {
 	 */
 	@Test
 	public void testReceiveBuffer() throws Exception {
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 
 			final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
 			handler.addInputChannel(inputChannel);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
index 91a9d5a..6b5560e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
@@ -52,15 +52,15 @@ public class PartitionRequestClientTest {
 		final PartitionRequestClient client = new PartitionRequestClient(
 			channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
 
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, numExclusiveBuffers);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client, 1, 2);
 
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 
 			// first subpartition request
 			inputChannel.requestSubpartition(0);
@@ -106,15 +106,15 @@ public class PartitionRequestClientTest {
 		final PartitionRequestClient client = new PartitionRequestClient(
 			channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
 
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, numExclusiveBuffers);
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
 
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
 			inputGate.setBufferPool(bufferPool);
-			final int numExclusiveBuffers = 2;
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			// The input channel should only send one partition request
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 0c25b24..a6ff087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -98,7 +98,7 @@ public class LocalInputChannelTest {
 
 		final NetworkBufferPool networkBuffers = new NetworkBufferPool(
 			(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
-			TestBufferFactory.BUFFER_SIZE);
+			TestBufferFactory.BUFFER_SIZE, 1);
 
 		final ResultPartitionManager partitionManager = new ResultPartitionManager();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 67bc6f2..935ddb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -325,8 +325,7 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
-		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
 		final int numFloatingBuffers = 14;
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
@@ -335,7 +334,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -464,8 +463,7 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
-		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
 		final int numFloatingBuffers = 14;
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
@@ -474,7 +472,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -539,8 +537,7 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
-		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
 		final int numFloatingBuffers = 14;
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
@@ -549,7 +546,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -628,8 +625,8 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testFairDistributionFloatingBuffers() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32);
 		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, numExclusiveBuffers);
 		final int numFloatingBuffers = 3;
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
@@ -640,7 +637,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			channel1.requestSubpartition(0);
 			channel2.requestSubpartition(0);
 			channel3.requestSubpartition(0);
@@ -695,11 +692,11 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testFailureInNotifyBufferAvailable() throws Exception {
 		// Setup
-		final int numExclusiveBuffers = 0;
+		final int numExclusiveBuffers = 1;
 		final int numFloatingBuffers = 1;
 		final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
 		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			numTotalBuffers, 32);
+			numTotalBuffers, 32, numExclusiveBuffers);
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
@@ -757,8 +754,7 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32);
-		final int numExclusiveBuffers = 2;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32, 2);
 		final int numFloatingBuffers = 128;
 
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -769,7 +765,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			final Callable<Void> requestBufferTask = new Callable<Void>() {
@@ -818,8 +814,8 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
 		final int numExclusiveSegments = 120;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, numExclusiveSegments);
 		final int numFloatingBuffers = 128;
 		final int backlog = 128;
 
@@ -831,7 +827,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			final Callable<Void> requestBufferTask = new Callable<Void>() {
@@ -870,8 +866,8 @@ public class RemoteInputChannelTest {
 	@Test
 	public void testConcurrentRecycleAndRelease() throws Exception {
 		// Setup
-		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
 		final int numExclusiveSegments = 120;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, numExclusiveSegments);
 		final int numFloatingBuffers = 128;
 
 		final ExecutorService executor = Executors.newFixedThreadPool(3);
@@ -882,7 +878,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			final Callable<Void> releaseTask = new Callable<Void>() {
@@ -926,7 +922,7 @@ public class RemoteInputChannelTest {
 		final int numFloatingBuffers = 2;
 		final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
 		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			numTotalBuffers, 32);
+			numTotalBuffers, 32, numExclusiveBuffers);
 
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
 
@@ -936,7 +932,7 @@ public class RemoteInputChannelTest {
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
 			inputGate.setBufferPool(bufferPool);
-			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+			inputGate.assignExclusiveSegments(networkBufferPool);
 			inputChannel.requestSubpartition(0);
 
 			final Callable<Void> bufferPoolInteractionsTask = () -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 53fabe1..8a5a1a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -92,7 +92,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
 
 	@Before
 	public void setUp() throws Exception {
-		networkBufferPool = new NetworkBufferPool(100, 8192);
+		networkBufferPool = new NetworkBufferPool(100, 8192, 1);
 		testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 
 		final Configuration configuration = new Configuration();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index b1a3ad5..2ea957d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -53,8 +53,8 @@ public class BarrierBufferMassiveRandomTest {
 		try {
 			ioMan = new IOManagerAsync();
 
-			networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE);
-			networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE);
+			networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE, 1);
+			networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE, 1);
 			BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
 			BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);