You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2023/05/06 02:39:31 UTC

[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.

This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c838d68a9db30c2cba021c1164a0cd464050da5a
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Apr 21 17:46:02 2023 +0800

    [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 11 +++++++++-
 .../io/network/buffer/NetworkBufferPool.java       | 18 ++++++++--------
 .../network/metrics/NettyShuffleMetricFactory.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java     |  5 +----
 .../io/network/buffer/NetworkBufferPoolTest.java   | 24 +++++++++++-----------
 5 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e012c725582..190734c35b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool {
     }
 
     /**
+     * Estimates the number of requested buffers.
+     *
      * @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For
      *     unbounded pools it returns an approximation based upon {@link
      *     #getNumberOfRequiredMemorySegments()}
      */
-    public int getNumberOfRequestedMemorySegments() {
+    public int getEstimatedNumberOfRequestedMemorySegments() {
         if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) {
             return maxNumberOfMemorySegments;
         } else {
@@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool {
         }
     }
 
+    @VisibleForTesting
+    public int getNumberOfRequestedMemorySegments() {
+        synchronized (availableMemorySegments) {
+            return numberOfRequestedMemorySegments;
+        }
+    }
+
     @VisibleForTesting
     public int getNumberOfRequestedOverdraftMemorySegments() {
         synchronized (availableMemorySegments) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 24ca78fd551..c14f89fe096 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -366,36 +366,38 @@ public class NetworkBufferPool
         }
     }
 
-    public long getNumberOfRequestedMemorySegments() {
+    public long getEstimatedNumberOfRequestedMemorySegments() {
         long requestedSegments = 0;
         synchronized (factoryLock) {
             for (LocalBufferPool bufferPool : allBufferPools) {
-                requestedSegments += bufferPool.getNumberOfRequestedMemorySegments();
+                requestedSegments += bufferPool.getEstimatedNumberOfRequestedMemorySegments();
             }
         }
         return requestedSegments;
     }
 
-    public long getRequestedMemory() {
-        return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+    public long getEstimatedRequestedMemory() {
+        return getEstimatedNumberOfRequestedMemorySegments() * memorySegmentSize;
     }
 
-    public int getRequestedSegmentsUsage() {
+    public int getEstimatedRequestedSegmentsUsage() {
         int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments();
         return totalNumberOfMemorySegments == 0
                 ? 0
                 : Math.toIntExact(
-                        100L * getNumberOfRequestedMemorySegments() / totalNumberOfMemorySegments);
+                        100L
+                                * getEstimatedNumberOfRequestedMemorySegments()
+                                / totalNumberOfMemorySegments);
     }
 
     @VisibleForTesting
     Optional<String> getUsageWarning() {
-        int currentUsage = getRequestedSegmentsUsage();
+        int currentUsage = getEstimatedRequestedSegmentsUsage();
         Optional<String> message = Optional.empty();
         // do not log warning if the value hasn't changed to avoid spamming warnings.
         if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != currentUsage) {
             long totalMemory = getTotalMemory();
-            long requestedMemory = getRequestedMemory();
+            long requestedMemory = getEstimatedRequestedMemory();
             long missingMemory = requestedMemory - totalMemory;
             message =
                     Optional.of(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
index 2aaffe2d681..817190732b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
@@ -244,7 +244,7 @@ public class NettyShuffleMetricFactory {
 
         @Override
         public Integer getValue() {
-            return networkBufferPool.getRequestedSegmentsUsage();
+            return networkBufferPool.getEstimatedRequestedSegmentsUsage();
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 6c0fcf13b47..4646fee2f49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -309,10 +309,7 @@ class LocalBufferPoolTest {
         assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
         assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
                 .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
-        assertThat(
-                        bufferPool.bestEffortGetNumOfUsedBuffers()
-                                + bufferPool.getNumberOfAvailableMemorySegments()
-                                - bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+        assertThat(bufferPool.getNumberOfRequestedMemorySegments())
                 .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
         assertThat(bufferPool.getNumberOfAvailableMemorySegments())
                 .isEqualTo(numAvailableBuffersAfterDecreasing);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 5a5bd186b3f..20fd5964b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -330,7 +330,7 @@ public class NetworkBufferPoolTest extends TestLogger {
         try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
             NetworkBufferPool globalPool = new NetworkBufferPool(0, 128);
             closeableRegistry.registerCloseable(globalPool::destroy);
-            assertEquals(0, globalPool.getRequestedSegmentsUsage());
+            assertEquals(0, globalPool.getEstimatedRequestedSegmentsUsage());
         }
     }
 
@@ -342,21 +342,21 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             BufferPool bufferPool1 = globalPool.createBufferPool(10, 20);
 
-            assertEquals(20, globalPool.getNumberOfRequestedMemorySegments());
-            assertEquals(40, globalPool.getRequestedSegmentsUsage());
+            assertEquals(20, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+            assertEquals(40, globalPool.getEstimatedRequestedSegmentsUsage());
             assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty()));
 
             closeableRegistry.registerCloseable(
                     (globalPool.createBufferPool(5, Integer.MAX_VALUE))::lazyDestroy);
 
-            assertEquals(30, globalPool.getNumberOfRequestedMemorySegments());
-            assertEquals(60, globalPool.getRequestedSegmentsUsage());
+            assertEquals(30, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+            assertEquals(60, globalPool.getEstimatedRequestedSegmentsUsage());
             assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty()));
 
             closeableRegistry.registerCloseable((globalPool.createBufferPool(10, 30))::lazyDestroy);
 
-            assertEquals(60, globalPool.getNumberOfRequestedMemorySegments());
-            assertEquals(120, globalPool.getRequestedSegmentsUsage());
+            assertEquals(60, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+            assertEquals(120, globalPool.getEstimatedRequestedSegmentsUsage());
             assertThat(
                     globalPool.getUsageWarning(),
                     equalTo(
@@ -372,8 +372,8 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             BufferPool bufferPool2 = globalPool.createBufferPool(10, 20);
 
-            assertEquals(80, globalPool.getNumberOfRequestedMemorySegments());
-            assertEquals(160, globalPool.getRequestedSegmentsUsage());
+            assertEquals(80, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+            assertEquals(160, globalPool.getEstimatedRequestedSegmentsUsage());
             assertThat(
                     globalPool.getUsageWarning(),
                     equalTo(
@@ -389,9 +389,9 @@ public class NetworkBufferPoolTest extends TestLogger {
             bufferPool2.lazyDestroy();
             bufferPool1.lazyDestroy();
 
-            assertEquals(40, globalPool.getNumberOfRequestedMemorySegments());
-            assertEquals(40 * 128, globalPool.getRequestedMemory());
-            assertEquals(80, globalPool.getRequestedSegmentsUsage());
+            assertEquals(40, globalPool.getEstimatedNumberOfRequestedMemorySegments());
+            assertEquals(40 * 128, globalPool.getEstimatedRequestedMemory());
+            assertEquals(80, globalPool.getEstimatedRequestedSegmentsUsage());
             assertThat(
                     globalPool.getUsageWarning(),
                     equalTo(Optional.of("Memory usage [80%] went back to normal")));