You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 13:42:42 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #18392: [FLINK-25590][metrics] Introduce RequestedMemoryUsage and log warnings if usage exceeds 100%

pnowojski commented on a change in pull request #18392:
URL: https://github.com/apache/flink/pull/18392#discussion_r790754115



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -356,6 +362,76 @@ public int getNumberOfRegisteredBufferPools() {
         }
     }
 
+    public long getNumberOfRequestedMemorySegments() {
+        long requestedSegments = 0;
+        synchronized (factoryLock) {
+            for (LocalBufferPool bufferPool : allBufferPools) {
+                int maxNumberOfMemorySegments = bufferPool.getMaxNumberOfMemorySegments();
+                /**
+                 * As defined in {@link
+                 * org.apache.flink.runtime.shuffle.NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition(int,
+                 * int, int, int, int,
+                 * org.apache.flink.runtime.io.network.partition.ResultPartitionType)}. Unbounded
+                 * subpartitions have {@link maxNumberOfMemorySegments} set to {@code
+                 * Integer.MAX_VALUE}. In this case let's use number of required segments instead.
+                 */
+                if (maxNumberOfMemorySegments < Integer.MAX_VALUE) {
+                    requestedSegments += maxNumberOfMemorySegments;
+                } else {
+                    requestedSegments += bufferPool.getNumberOfRequiredMemorySegments();
+                }
+            }
+        }
+        return requestedSegments;
+    }
+
+    public long getRequestedMemory() {
+        return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+    }
+
+    public int getRequestedSegmentsUsage() {
+        return Math.toIntExact(
+                100L * getNumberOfRequestedMemorySegments() / getTotalNumberOfMemorySegments());
+    }
+
+    @VisibleForTesting
+    Optional<String> getUsageWarning() {
+        int currentUsage = getRequestedSegmentsUsage();

Review comment:
       > Or do we have the case when we create new buffer pool in runtime?
   
   Yes, that's possible. First and foremost this can happen if you have one cluster running multiple, potential short lived adhoc, jobs. Secondly for batch execution, not all tasks are scheduled at once. Tasks from later stages can be unblocked from execution once their upstream tasks complete producing the result, so at different point of time, different number of tasks, with different memory requirements can be running even for a single job. Lastly during startup not all `LocalBufferPools` are created at once and there might be quite significant delay in different tasks startup time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org