You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2023/05/06 02:39:31 UTC
[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.
This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c838d68a9db30c2cba021c1164a0cd464050da5a
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Apr 21 17:46:02 2023 +0800
[FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.
---
.../runtime/io/network/buffer/LocalBufferPool.java | 11 +++++++++-
.../io/network/buffer/NetworkBufferPool.java | 18 ++++++++--------
.../network/metrics/NettyShuffleMetricFactory.java | 2 +-
.../io/network/buffer/LocalBufferPoolTest.java | 5 +----
.../io/network/buffer/NetworkBufferPoolTest.java | 24 +++++++++++-----------
5 files changed, 34 insertions(+), 26 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e012c725582..190734c35b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool {
}
/**
+ * Estimates the number of requested buffers.
+ *
* @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For
* unbounded pools it returns an approximation based upon {@link
* #getNumberOfRequiredMemorySegments()}
*/
- public int getNumberOfRequestedMemorySegments() {
+ public int getEstimatedNumberOfRequestedMemorySegments() {
if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) {
return maxNumberOfMemorySegments;
} else {
@@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool {
}
}
+ @VisibleForTesting
+ public int getNumberOfRequestedMemorySegments() {
+ synchronized (availableMemorySegments) {
+ return numberOfRequestedMemorySegments;
+ }
+ }
+
@VisibleForTesting
public int getNumberOfRequestedOverdraftMemorySegments() {
synchronized (availableMemorySegments) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 24ca78fd551..c14f89fe096 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -366,36 +366,38 @@ public class NetworkBufferPool
}
}
- public long getNumberOfRequestedMemorySegments() {
+ public long getEstimatedNumberOfRequestedMemorySegments() {
long requestedSegments = 0;
synchronized (factoryLock) {
for (LocalBufferPool bufferPool : allBufferPools) {
- requestedSegments += bufferPool.getNumberOfRequestedMemorySegments();
+ requestedSegments += bufferPool.getEstimatedNumberOfRequestedMemorySegments();
}
}
return requestedSegments;
}
- public long getRequestedMemory() {
- return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+ public long getEstimatedRequestedMemory() {
+ return getEstimatedNumberOfRequestedMemorySegments() * memorySegmentSize;
}
- public int getRequestedSegmentsUsage() {
+ public int getEstimatedRequestedSegmentsUsage() {
int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments();
return totalNumberOfMemorySegments == 0
? 0
: Math.toIntExact(
- 100L * getNumberOfRequestedMemorySegments() / totalNumberOfMemorySegments);
+ 100L
+ * getEstimatedNumberOfRequestedMemorySegments()
+ / totalNumberOfMemorySegments);
}
@VisibleForTesting
Optional<String> getUsageWarning() {
- int currentUsage = getRequestedSegmentsUsage();
+ int currentUsage = getEstimatedRequestedSegmentsUsage();
Optional<String> message = Optional.empty();
// do not log warning if the value hasn't changed to avoid spamming warnings.
if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != currentUsage) {
long totalMemory = getTotalMemory();
- long requestedMemory = getRequestedMemory();
+ long requestedMemory = getEstimatedRequestedMemory();
long missingMemory = requestedMemory - totalMemory;
message =
Optional.of(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
index 2aaffe2d681..817190732b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
@@ -244,7 +244,7 @@ public class NettyShuffleMetricFactory {
@Override
public Integer getValue() {
- return networkBufferPool.getRequestedSegmentsUsage();
+ return networkBufferPool.getEstimatedRequestedSegmentsUsage();
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 6c0fcf13b47..4646fee2f49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -309,10 +309,7 @@ class LocalBufferPoolTest {
assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
.isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
- assertThat(
- bufferPool.bestEffortGetNumOfUsedBuffers()
- + bufferPool.getNumberOfAvailableMemorySegments()
- - bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+ assertThat(bufferPool.getNumberOfRequestedMemorySegments())
.isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
assertThat(bufferPool.getNumberOfAvailableMemorySegments())
.isEqualTo(numAvailableBuffersAfterDecreasing);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 5a5bd186b3f..20fd5964b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -330,7 +330,7 @@ public class NetworkBufferPoolTest extends TestLogger {
try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
NetworkBufferPool globalPool = new NetworkBufferPool(0, 128);
closeableRegistry.registerCloseable(globalPool::destroy);
- assertEquals(0, globalPool.getRequestedSegmentsUsage());
+ assertEquals(0, globalPool.getEstimatedRequestedSegmentsUsage());
}
}
@@ -342,21 +342,21 @@ public class NetworkBufferPoolTest extends TestLogger {
BufferPool bufferPool1 = globalPool.createBufferPool(10, 20);
- assertEquals(20, globalPool.getNumberOfRequestedMemorySegments());
- assertEquals(40, globalPool.getRequestedSegmentsUsage());
+ assertEquals(20, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+ assertEquals(40, globalPool.getEstimatedRequestedSegmentsUsage());
assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty()));
closeableRegistry.registerCloseable(
(globalPool.createBufferPool(5, Integer.MAX_VALUE))::lazyDestroy);
- assertEquals(30, globalPool.getNumberOfRequestedMemorySegments());
- assertEquals(60, globalPool.getRequestedSegmentsUsage());
+ assertEquals(30, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+ assertEquals(60, globalPool.getEstimatedRequestedSegmentsUsage());
assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty()));
closeableRegistry.registerCloseable((globalPool.createBufferPool(10, 30))::lazyDestroy);
- assertEquals(60, globalPool.getNumberOfRequestedMemorySegments());
- assertEquals(120, globalPool.getRequestedSegmentsUsage());
+ assertEquals(60, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+ assertEquals(120, globalPool.getEstimatedRequestedSegmentsUsage());
assertThat(
globalPool.getUsageWarning(),
equalTo(
@@ -372,8 +372,8 @@ public class NetworkBufferPoolTest extends TestLogger {
BufferPool bufferPool2 = globalPool.createBufferPool(10, 20);
- assertEquals(80, globalPool.getNumberOfRequestedMemorySegments());
- assertEquals(160, globalPool.getRequestedSegmentsUsage());
+ assertEquals(80, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+ assertEquals(160, globalPool.getEstimatedRequestedSegmentsUsage());
assertThat(
globalPool.getUsageWarning(),
equalTo(
@@ -389,9 +389,9 @@ public class NetworkBufferPoolTest extends TestLogger {
bufferPool2.lazyDestroy();
bufferPool1.lazyDestroy();
- assertEquals(40, globalPool.getNumberOfRequestedMemorySegments());
- assertEquals(40 * 128, globalPool.getRequestedMemory());
- assertEquals(80, globalPool.getRequestedSegmentsUsage());
+ assertEquals(40, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+ assertEquals(40 * 128, globalPool.getEstimatedRequestedMemory());
+ assertEquals(80, globalPool.getEstimatedRequestedSegmentsUsage());
assertThat(
globalPool.getUsageWarning(),
equalTo(Optional.of("Memory usage [80%] went back to normal")));