You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/18 15:09:32 UTC

[flink] 02/02: [FLINK-28800][network] HsFileDataManager should avoid busy-loop when fileReader has not data to read

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d13403429db27d63fdd6932c65c23ed5b90ef96
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 17 15:56:12 2022 +0800

    [FLINK-28800][network] HsFileDataManager should avoid busy-loop when fileReader has not data to read
    
    This closes #20553
---
 .../partition/hybrid/HsFileDataManager.java        | 29 ++++++++++++++--------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index eaef4110edc..75d322f6452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -49,6 +49,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -275,16 +276,16 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         return new ArrayDeque<>();
     }
 
-    @GuardedBy("lock")
     private void mayTriggerReading() {
-        assert Thread.holdsLock(lock);
-
-        if (!isRunning
-                && !allReaders.isEmpty()
-                && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers
-                && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) {
-            isRunning = true;
-            ioExecutor.execute(this);
+        synchronized (lock) {
+            if (!isRunning
+                    && !allReaders.isEmpty()
+                    && numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
+                            <= maxRequestedBuffers
+                    && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) {
+                isRunning = true;
+                ioExecutor.execute(this);
+            }
         }
     }
 
@@ -364,9 +365,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         synchronized (lock) {
             numRequestedBuffers += numBuffersRead;
             isRunning = false;
-            mayTriggerReading();
             mayNotifyReleased();
         }
+        if (numBuffersRead == 0) {
+            // When fileReader has no data to read, for example, most of the data is
+            // consumed from memory. HsFileDataManager will encounter busy-loop
+            // problem, which will lead to a meaningless surge in CPU utilization
+            // and seriously affect performance.
+            ioExecutor.schedule(this::mayTriggerReading, 5, TimeUnit.MILLISECONDS);
+        } else {
+            mayTriggerReading();
+        }
     }
 
     @GuardedBy("lock")