You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/02/07 09:15:08 UTC
[flink] branch master updated: [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.
This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 855348a [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.
855348a is described below
commit 855348a9ac482f141131c302b21d4a67e4c3e0c7
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Dec 30 17:27:26 2021 +0800
[FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.
This closes #18433.
---
.../io/network/buffer/NetworkBufferPool.java | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 19e4cb8..ffdb9c7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -83,6 +83,8 @@ public class NetworkBufferPool
private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
+ private final Set<LocalBufferPool> resizableBufferPools = new HashSet<>();
+
private int numTotalRequiredBuffers;
private final Duration requestSegmentsTimeout;
@@ -500,6 +502,10 @@ public class NetworkBufferPool
allBufferPools.add(localBufferPool);
+ if (numRequiredBuffers < maxUsedBuffers) {
+ resizableBufferPools.add(localBufferPool);
+ }
+
redistributeBuffers();
return localBufferPool;
@@ -515,6 +521,7 @@ public class NetworkBufferPool
synchronized (factoryLock) {
if (allBufferPools.remove(bufferPool)) {
numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
+ resizableBufferPools.remove(bufferPool);
redistributeBuffers();
}
@@ -536,7 +543,9 @@ public class NetworkBufferPool
}
// some sanity checks
- if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+ if (allBufferPools.size() > 0
+ || numTotalRequiredBuffers > 0
+ || resizableBufferPools.size() > 0) {
throw new IllegalStateException(
"NetworkBufferPool is not empty after destroying all LocalBufferPools");
}
@@ -573,12 +582,16 @@ public class NetworkBufferPool
private void redistributeBuffers() {
assert Thread.holdsLock(factoryLock);
+ if (resizableBufferPools.isEmpty()) {
+ return;
+ }
+
// All buffers, which are not among the required ones
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
- for (LocalBufferPool bufferPool : allBufferPools) {
+ for (LocalBufferPool bufferPool : resizableBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
@@ -594,7 +607,7 @@ public class NetworkBufferPool
long totalCapacity = 0; // long to avoid int overflow
- for (LocalBufferPool bufferPool : allBufferPools) {
+ for (LocalBufferPool bufferPool : resizableBufferPools) {
int excessMax =
bufferPool.getMaxNumberOfMemorySegments()
- bufferPool.getNumberOfRequiredMemorySegments();
@@ -614,7 +627,7 @@ public class NetworkBufferPool
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
- for (LocalBufferPool bufferPool : allBufferPools) {
+ for (LocalBufferPool bufferPool : resizableBufferPools) {
int excessMax =
bufferPool.getMaxNumberOfMemorySegments()
- bufferPool.getNumberOfRequiredMemorySegments();