You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:32 UTC
[flink] 07/10: [hotfix][network] Introduce MemorySegmentProvider
for RemoteInputChannel to assign exclusive segments
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2375693f1cc80118bf61922f027942a0f101c486
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 19:01:31 2019 +0200
[hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments
---
.../flink/core/memory/MemorySegmentProvider.java | 31 +++++++++++
.../runtime/io/network/NetworkEnvironment.java | 5 +-
.../io/network/buffer/NetworkBufferPool.java | 35 +++++++-----
.../partition/consumer/RemoteInputChannel.java | 5 +-
.../partition/consumer/SingleInputGate.java | 29 ++++------
.../runtime/io/network/NetworkEnvironmentTest.java | 16 +++---
.../io/network/buffer/BufferPoolFactoryTest.java | 12 ++--
.../network/buffer/LocalBufferPoolDestroyTest.java | 2 +-
.../io/network/buffer/LocalBufferPoolTest.java | 2 +-
.../io/network/buffer/NetworkBufferPoolTest.java | 65 ++++++++++------------
...editBasedPartitionRequestClientHandlerTest.java | 15 ++---
.../netty/PartitionRequestClientHandlerTest.java | 5 +-
.../network/netty/PartitionRequestClientTest.java | 12 ++--
.../partition/consumer/LocalInputChannelTest.java | 2 +-
.../partition/consumer/RemoteInputChannelTest.java | 40 ++++++-------
.../BackPressureStatsTrackerImplITCase.java | 2 +-
.../runtime/io/BarrierBufferMassiveRandomTest.java | 4 +-
17 files changed, 150 insertions(+), 132 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
new file mode 100644
index 0000000..83b2d19
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The provider used for requesting and releasing batch of memory segments.
+ */
+public interface MemorySegmentProvider {
+ Collection<MemorySegment> requestMemorySegments() throws IOException;
+
+ void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d0e8263..97016ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -129,7 +129,8 @@ public class NetworkEnvironment {
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
- config.networkBufferSize());
+ config.networkBufferSize(),
+ config.networkBuffersPerChannel());
registerNetworkMetrics(metricGroup, networkBufferPool);
@@ -246,7 +247,7 @@ public class NetworkEnvironment {
config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
// assign exclusive buffers to input channels directly and use the rest for floating buffers
- gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel());
+ gate.assignExclusiveSegments(networkBufferPool);
bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 48ce27e..3d6c2da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MathUtils;
@@ -31,6 +32,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -49,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* the buffers for the network data transfer. When new local buffer pools are created, the
* NetworkBufferPool dynamically redistributes the buffers between the pools.
*/
-public class NetworkBufferPool implements BufferPoolFactory {
+public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvider {
private static final Logger LOG = LoggerFactory.getLogger(NetworkBufferPool.class);
@@ -69,14 +71,19 @@ public class NetworkBufferPool implements BufferPoolFactory {
private int numTotalRequiredBuffers;
+ private final int numberOfSegmentsToRequest;
+
/**
* Allocates all {@link MemorySegment} instances managed by this pool.
*/
- public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
+ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest) {
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
this.memorySegmentSize = segmentSize;
+ checkArgument(numberOfSegmentsToRequest > 0, "The number of required buffers should be larger than 0.");
+ this.numberOfSegmentsToRequest = numberOfSegmentsToRequest;
+
final long sizeInLong = (long) segmentSize;
try {
@@ -126,20 +133,19 @@ public class NetworkBufferPool implements BufferPoolFactory {
availableMemorySegments.add(checkNotNull(segment));
}
- public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {
- checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0.");
-
+ @Override
+ public List<MemorySegment> requestMemorySegments() throws IOException {
synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}
- if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
+ if (numTotalRequiredBuffers + numberOfSegmentsToRequest > totalNumberOfMemorySegments) {
throw new IOException(String.format("Insufficient number of network buffers: " +
"required %d, but only %d available. The total number of network " +
"buffers is currently set to %d of %d bytes each. You can increase this " +
"number by setting the configuration keys '%s', '%s', and '%s'.",
- numRequiredBuffers,
+ numberOfSegmentsToRequest,
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
@@ -148,12 +154,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
- this.numTotalRequiredBuffers += numRequiredBuffers;
+ this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
try {
redistributeBuffers();
} catch (Throwable t) {
- this.numTotalRequiredBuffers -= numRequiredBuffers;
+ this.numTotalRequiredBuffers -= numberOfSegmentsToRequest;
try {
redistributeBuffers();
@@ -164,9 +170,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
- final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
+ final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);
try {
- while (segments.size() < numRequiredBuffers) {
+ while (segments.size() < numberOfSegmentsToRequest) {
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
}
@@ -178,7 +184,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
} catch (Throwable e) {
try {
- recycleMemorySegments(segments, numRequiredBuffers);
+ recycleMemorySegments(segments, numberOfSegmentsToRequest);
} catch (IOException inner) {
e.addSuppressed(inner);
}
@@ -188,11 +194,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
return segments;
}
- public void recycleMemorySegments(List<MemorySegment> segments) throws IOException {
+ @Override
+ public void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException {
recycleMemorySegments(segments, segments.size());
}
- private void recycleMemorySegments(List<MemorySegment> segments, int size) throws IOException {
+ private void recycleMemorySegments(Collection<MemorySegment> segments, int size) throws IOException {
synchronized (factoryLock) {
numTotalRequiredBuffers -= size;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 30246c0..98182c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -40,6 +40,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -122,12 +123,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
* Assigns exclusive buffers to this input channel, and this method should be called only once
* after this input channel is created.
*/
- void assignExclusiveSegments(List<MemorySegment> segments) {
+ void assignExclusiveSegments(Collection<MemorySegment> segments) {
checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have " +
"already been set for this input channel.");
checkNotNull(segments);
- checkArgument(segments.size() > 0, "The number of exclusive buffers per channel should be larger than 0.");
+ checkArgument(!segments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0.");
this.initialCredit = segments.size();
this.numRequiredBuffers = segments.size();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 406416b..6b8cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -30,7 +31,6 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
@@ -155,8 +155,8 @@ public class SingleInputGate extends InputGate {
*/
private BufferPool bufferPool;
- /** Global network buffer pool to request and recycle exclusive buffers (only for credit-based). */
- private NetworkBufferPool networkBufferPool;
+ /** Global memory segment provider to request and recycle exclusive buffers (only for credit-based). */
+ private MemorySegmentProvider memorySegmentProvider;
private final boolean isCreditBased;
@@ -172,9 +172,6 @@ public class SingleInputGate extends InputGate {
private int numberOfUninitializedChannels;
- /** Number of network buffers to use for each remote input channel. */
- private int networkBuffersPerChannel;
-
/** A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;
@@ -293,22 +290,20 @@ public class SingleInputGate extends InputGate {
/**
* Assign the exclusive buffers to all remote input channels directly for credit-based mode.
*
- * @param networkBufferPool The global pool to request and recycle exclusive buffers
- * @param networkBuffersPerChannel The number of exclusive buffers for each channel
+ * @param memorySegmentProvider The global memory segment provider to request and recycle exclusive buffers
*/
- public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException {
+ public void assignExclusiveSegments(MemorySegmentProvider memorySegmentProvider) throws IOException {
checkState(this.isCreditBased, "Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
- checkState(this.networkBufferPool == null, "Bug in input gate setup logic: global buffer pool has" +
- "already been set for this input gate.");
+ checkState(this.memorySegmentProvider == null,
+ "Bug in input gate setup logic: global memory segment provider has already been set for this input gate.");
- this.networkBufferPool = checkNotNull(networkBufferPool);
- this.networkBuffersPerChannel = networkBuffersPerChannel;
+ this.memorySegmentProvider = checkNotNull(memorySegmentProvider);
synchronized (requestLock) {
for (InputChannel inputChannel : inputChannels.values()) {
if (inputChannel instanceof RemoteInputChannel) {
((RemoteInputChannel) inputChannel).assignExclusiveSegments(
- networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+ memorySegmentProvider.requestMemorySegments());
}
}
}
@@ -320,7 +315,7 @@ public class SingleInputGate extends InputGate {
* @param segments The exclusive segments need to be recycled
*/
public void returnExclusiveSegments(List<MemorySegment> segments) throws IOException {
- networkBufferPool.recycleMemorySegments(segments);
+ memorySegmentProvider.recycleMemorySegments(segments);
}
public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
@@ -359,10 +354,10 @@ public class SingleInputGate extends InputGate {
newChannel = unknownChannel.toRemoteInputChannel(partitionLocation.getConnectionId());
if (this.isCreditBased) {
- checkState(this.networkBufferPool != null, "Bug in input gate setup logic: " +
+ checkState(this.memorySegmentProvider != null, "Bug in input gate setup logic: " +
"global buffer pool has not been set for this input gate.");
((RemoteInputChannel) newChannel).assignExclusiveSegments(
- networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+ memorySegmentProvider.requestMemorySegments());
}
}
else {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 81cd8f4..c06bfa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -117,10 +117,10 @@ public class NetworkEnvironmentTest {
assertEquals(enableCreditBasedFlowControl ? 8 : 8 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
int invokations = enableCreditBasedFlowControl ? 1 : 0;
- verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
for (ResultPartition rp : resultPartitions) {
rp.release();
@@ -243,10 +243,10 @@ public class NetworkEnvironmentTest {
assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
int invokations = enableCreditBasedFlowControl ? 1 : 0;
- verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
+ verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool());
for (ResultPartition rp : resultPartitions) {
rp.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 14eb6cd..bb6a645 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -54,7 +54,7 @@ public class BufferPoolFactoryTest {
@Before
public void setupNetworkBufferPool() {
- networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+ networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, 1);
}
@After
@@ -244,7 +244,7 @@ public class BufferPoolFactoryTest {
@Test
public void testUniformDistributionBounded3() throws IOException {
- NetworkBufferPool globalPool = new NetworkBufferPool(3, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, 1);
try {
BufferPool first = globalPool.createBufferPool(0, 10);
assertEquals(3, first.getNumBuffers());
@@ -277,12 +277,12 @@ public class BufferPoolFactoryTest {
*/
@Test
public void testUniformDistributionBounded4() throws IOException {
- NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 2);
try {
BufferPool first = globalPool.createBufferPool(0, 10);
assertEquals(10, first.getNumBuffers());
- List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
+ List<MemorySegment> segmentList1 = globalPool.requestMemorySegments();
assertEquals(2, segmentList1.size());
assertEquals(8, first.getNumBuffers());
@@ -290,12 +290,12 @@ public class BufferPoolFactoryTest {
assertEquals(4, first.getNumBuffers());
assertEquals(4, second.getNumBuffers());
- List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
+ List<MemorySegment> segmentList2 = globalPool.requestMemorySegments();
assertEquals(2, segmentList2.size());
assertEquals(3, first.getNumBuffers());
assertEquals(3, second.getNumBuffers());
- List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
+ List<MemorySegment> segmentList3 = globalPool.requestMemorySegments();
assertEquals(2, segmentList3.size());
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index 5e8e42e..116884c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -48,7 +48,7 @@ public class LocalBufferPoolDestroyTest {
LocalBufferPool localBufferPool = null;
try {
- networkBufferPool = new NetworkBufferPool(1, 4096);
+ networkBufferPool = new NetworkBufferPool(1, 4096, 1);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);
// Drain buffer pool
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index a0e10d7..2b7fa7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -67,7 +67,7 @@ public class LocalBufferPoolTest extends TestLogger {
@Before
public void setupLocalBufferPool() {
- networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+ networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, 1);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 4e8eec6..89f673c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -60,7 +60,7 @@ public class NetworkBufferPoolTest extends TestLogger {
final int bufferSize = 128;
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, 1);
assertEquals(bufferSize, globalPool.getMemorySegmentSize());
assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
@@ -104,7 +104,7 @@ public class NetworkBufferPoolTest extends TestLogger {
@Test
public void testDestroyAll() {
try {
- NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 1);
BufferPool fixedPool = globalPool.createBufferPool(2, 2);
BufferPool boundedPool = globalPool.createBufferPool(0, 1);
@@ -192,18 +192,18 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
+ * Tests {@link NetworkBufferPool#requestMemorySegments()} with the {@link NetworkBufferPool}
* currently containing the number of required free segments.
*/
@Test
public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, numBuffers / 2);
List<MemorySegment> memorySegments = Collections.emptyList();
try {
- memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
+ memorySegments = globalPool.requestMemorySegments();
assertEquals(memorySegments.size(), numBuffers / 2);
globalPool.recycleMemorySegments(memorySegments);
@@ -216,17 +216,17 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
+ * Tests {@link NetworkBufferPool#requestMemorySegments()} with the number of required
* buffers exceeding the capacity of {@link NetworkBufferPool}.
*/
@Test
public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, numBuffers + 1);
try {
- globalPool.requestMemorySegments(numBuffers + 1);
+ globalPool.requestMemorySegments();
fail("Should throw an IOException");
} catch (IOException e) {
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
@@ -236,35 +236,26 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
+ * Tests {@link NetworkBufferPool} constructor with the invalid argument to
* cause exception.
*/
- @Test
- public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
- final int numBuffers = 10;
-
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-
- try {
- // the number of requested buffers should be larger than zero
- globalPool.requestMemorySegments(0);
- fail("Should throw an IllegalArgumentException");
- } catch (IllegalArgumentException e) {
- assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
- } finally {
- globalPool.destroy();
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testRequestMemorySegmentsWithInvalidArgument() {
+ // the number of requested buffers should be larger than zero
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 0);
+ globalPool.destroy();
+ fail("Should throw an IllegalArgumentException");
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
+ * Tests {@link NetworkBufferPool#requestMemorySegments()} with the {@link NetworkBufferPool}
* currently not containing the number of required free segments (currently occupied by a buffer pool).
*/
@Test
public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
final int numBuffers = 10;
- NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, numBuffers / 2);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
List<MemorySegment> memorySegments = Collections.emptyList();
@@ -298,7 +289,7 @@ public class NetworkBufferPoolTest extends TestLogger {
// take more buffers than are freely available at the moment via requestMemorySegments()
isRunning.await();
- memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
+ memorySegments = networkBufferPool.requestMemorySegments();
assertThat(memorySegments, not(hasItem(nullValue())));
} finally {
if (bufferRecycler != null) {
@@ -313,14 +304,14 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with an exception occurring during
+ * Tests {@link NetworkBufferPool#requestMemorySegments()} with an exception occurring during
* the call to {@link NetworkBufferPool#redistributeBuffers()}.
*/
@Test
public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
final int numBuffers = 3;
- NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, 2);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
List<MemorySegment> memorySegments = Collections.emptyList();
@@ -339,7 +330,7 @@ public class NetworkBufferPoolTest extends TestLogger {
}
// this will ask the buffer pool to release its excess buffers which should fail
- memorySegments = networkBufferPool.requestMemorySegments(2);
+ memorySegments = networkBufferPool.requestMemorySegments();
fail("Requesting memory segments should have thrown during buffer pool redistribution.");
} catch (TestIOException e) {
// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
@@ -361,7 +352,7 @@ public class NetworkBufferPoolTest extends TestLogger {
@Test
public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
final int numBuffers = 3;
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, 1);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers,
@@ -406,14 +397,14 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
+ * Tests {@link NetworkBufferPool#requestMemorySegments()}, verifying it may be aborted in
* case of a concurrent {@link NetworkBufferPool#destroy()} call.
*/
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, 10);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);
@@ -422,7 +413,7 @@ public class NetworkBufferPoolTest extends TestLogger {
@Override
public void go() throws Exception {
isRunning.trigger();
- globalPool.requestMemorySegments(10);
+ globalPool.requestMemorySegments();
}
};
asyncRequest.start();
@@ -445,14 +436,14 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
- * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
+ * Tests {@link NetworkBufferPool#requestMemorySegments()}, verifying it may be aborted and
* remains in a defined state even if the waiting is interrupted.
*/
@Test
public void testRequestMemorySegmentsInterruptable2() throws Exception {
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, 10);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);
@@ -461,7 +452,7 @@ public class NetworkBufferPoolTest extends TestLogger {
@Override
public void go() throws Exception {
isRunning.trigger();
- globalPool.requestMemorySegments(10);
+ globalPool.requestMemorySegments();
}
};
asyncRequest.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 8cdf221..f2808fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -136,14 +136,13 @@ public class CreditBasedPartitionRequestClientHandlerTest {
*/
@Test
public void testReceiveBuffer() throws Exception {
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
handler.addInputChannel(inputChannel);
@@ -245,15 +244,14 @@ public class CreditBasedPartitionRequestClientHandlerTest {
final PartitionRequestClient client = new PartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel1 = createRemoteInputChannel(inputGate, client);
final RemoteInputChannel inputChannel2 = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel1.requestSubpartition(0);
inputChannel2.requestSubpartition(0);
@@ -346,14 +344,13 @@ public class CreditBasedPartitionRequestClientHandlerTest {
final PartitionRequestClient client = new PartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 16a2415..9017bf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -127,14 +127,13 @@ public class PartitionRequestClientHandlerTest {
*/
@Test
public void testReceiveBuffer() throws Exception {
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
handler.addInputChannel(inputChannel);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
index 91a9d5a..6b5560e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
@@ -52,15 +52,15 @@ public class PartitionRequestClientTest {
final PartitionRequestClient client = new PartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, numExclusiveBuffers);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client, 1, 2);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
// first subpartition request
inputChannel.requestSubpartition(0);
@@ -106,15 +106,15 @@ public class PartitionRequestClientTest {
final PartitionRequestClient client = new PartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
+ final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, numExclusiveBuffers);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
- final int numExclusiveBuffers = 2;
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
// The input channel should only send one partition request
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 0c25b24..a6ff087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -98,7 +98,7 @@ public class LocalInputChannelTest {
final NetworkBufferPool networkBuffers = new NetworkBufferPool(
(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
- TestBufferFactory.BUFFER_SIZE);
+ TestBufferFactory.BUFFER_SIZE, 1);
final ResultPartitionManager partitionManager = new ResultPartitionManager();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 67bc6f2..935ddb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -325,8 +325,7 @@ public class RemoteInputChannelTest {
@Test
public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
- final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
final int numFloatingBuffers = 14;
final SingleInputGate inputGate = createSingleInputGate(1);
@@ -335,7 +334,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -464,8 +463,7 @@ public class RemoteInputChannelTest {
@Test
public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
- final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
final int numFloatingBuffers = 14;
final SingleInputGate inputGate = createSingleInputGate(1);
@@ -474,7 +472,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -539,8 +537,7 @@ public class RemoteInputChannelTest {
@Test
public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
- final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32, 2);
final int numFloatingBuffers = 14;
final SingleInputGate inputGate = createSingleInputGate(1);
@@ -549,7 +546,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
// Prepare the exclusive and floating buffers to verify recycle logic later
@@ -628,8 +625,8 @@ public class RemoteInputChannelTest {
@Test
public void testFairDistributionFloatingBuffers() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32);
final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, numExclusiveBuffers);
final int numFloatingBuffers = 3;
final SingleInputGate inputGate = createSingleInputGate(1);
@@ -640,7 +637,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
channel1.requestSubpartition(0);
channel2.requestSubpartition(0);
channel3.requestSubpartition(0);
@@ -695,11 +692,11 @@ public class RemoteInputChannelTest {
@Test
public void testFailureInNotifyBufferAvailable() throws Exception {
// Setup
- final int numExclusiveBuffers = 0;
+ final int numExclusiveBuffers = 1;
final int numFloatingBuffers = 1;
final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
- numTotalBuffers, 32);
+ numTotalBuffers, 32, numExclusiveBuffers);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
@@ -757,8 +754,7 @@ public class RemoteInputChannelTest {
@Test
public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32);
- final int numExclusiveBuffers = 2;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32, 2);
final int numFloatingBuffers = 128;
final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -769,7 +765,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
final Callable<Void> requestBufferTask = new Callable<Void>() {
@@ -818,8 +814,8 @@ public class RemoteInputChannelTest {
@Test
public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
final int numExclusiveSegments = 120;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, numExclusiveSegments);
final int numFloatingBuffers = 128;
final int backlog = 128;
@@ -831,7 +827,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
final Callable<Void> requestBufferTask = new Callable<Void>() {
@@ -870,8 +866,8 @@ public class RemoteInputChannelTest {
@Test
public void testConcurrentRecycleAndRelease() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
final int numExclusiveSegments = 120;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, numExclusiveSegments);
final int numFloatingBuffers = 128;
final ExecutorService executor = Executors.newFixedThreadPool(3);
@@ -882,7 +878,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
final Callable<Void> releaseTask = new Callable<Void>() {
@@ -926,7 +922,7 @@ public class RemoteInputChannelTest {
final int numFloatingBuffers = 2;
final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
- numTotalBuffers, 32);
+ numTotalBuffers, 32, numExclusiveBuffers);
final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -936,7 +932,7 @@ public class RemoteInputChannelTest {
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
- inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
+ inputGate.assignExclusiveSegments(networkBufferPool);
inputChannel.requestSubpartition(0);
final Callable<Void> bufferPoolInteractionsTask = () -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 53fabe1..8a5a1a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -92,7 +92,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
@Before
public void setUp() throws Exception {
- networkBufferPool = new NetworkBufferPool(100, 8192);
+ networkBufferPool = new NetworkBufferPool(100, 8192, 1);
testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
final Configuration configuration = new Configuration();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index b1a3ad5..2ea957d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -53,8 +53,8 @@ public class BarrierBufferMassiveRandomTest {
try {
ioMan = new IOManagerAsync();
- networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE);
- networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE);
+ networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE, 1);
+ networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE, 1);
BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);