You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2022/11/30 09:52:15 UTC

[incubator-uniffle] branch branch-0.6 updated: [BUG] Potenial memory leak when encountering disk unhealthy (#370)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 36759f38 [BUG] Potenial memory leak when encountering disk unhealthy (#370)
36759f38 is described below

commit 36759f382e9c184c9ec727b2d04a04c90ca7e16f
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Nov 29 14:04:26 2022 +0800

    [BUG] Potenial memory leak when encountering disk unhealthy (#370)
    
    ### What changes were proposed in this pull request?
    
    Fix potential memory leak when encountering disk unhealthy
    
    ### Why are the changes needed?
    
    When encountering disk unhealthy and exceed the timeout of pendingDataShuffleFlushEvent, it will release memory. But in current codebase, it wont release the data reference and cause the memory leak.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    No need.
---
 .../apache/uniffle/server/ShuffleFlushManager.java   | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index daae92df..96f89296 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -208,12 +208,8 @@ public class ShuffleFlushManager {
       // just log the error, don't throw the exception and stop the flush thread
       LOG.error("Exception happened when process flush shuffle data for " + event, e);
     } finally {
-      ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
-      if (shuffleBuffer != null) {
-        shuffleBuffer.clearInFlushBuffer(event.getEventId());
-      }
+      cleanupFlushEventData(event);
       if (shuffleServer != null) {
-        shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(), true, false);
         long duration = System.currentTimeMillis() - start;
         if (writeSuccess) {
           LOG.debug("Flush to file success in " + duration + " ms and release " + event.getSize() + " bytes");
@@ -310,14 +306,22 @@ public class ShuffleFlushManager {
     addPendingEventsInternal(event);
   }
 
-  private void dropPendingEvent(PendingShuffleFlushEvent event) {
-    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+  private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
+    ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
+    if (shuffleBuffer != null) {
+      shuffleBuffer.clearInFlushBuffer(event.getEventId());
+    }
     if (shuffleServer != null) {
       shuffleServer.getShuffleBufferManager().releaseMemory(
-          event.getEvent().getSize(), true, false);
+          event.getSize(), true, false);
     }
   }
 
+  private void dropPendingEvent(PendingShuffleFlushEvent event) {
+    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+    cleanupFlushEventData(event.getEvent());
+  }
+
   @VisibleForTesting
   void addPendingEvents(ShuffleDataFlushEvent event) {
     addPendingEventsInternal(new PendingShuffleFlushEvent(event));