You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2024/01/15 06:22:20 UTC

(incubator-celeborn) branch main updated: [CELEBORN-1220][IMPROVEMENT] Make trim logic more robust

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cbdabf4ae [CELEBORN-1220][IMPROVEMENT] Make trim logic more robust
cbdabf4ae is described below

commit cbdabf4ae6fb9a9741ac752f5ab08bf4646227f6
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 15 14:22:03 2024 +0800

    [CELEBORN-1220][IMPROVEMENT] Make trim logic more robust
    
    ### What changes were proposed in this pull request?
    We meet a case that trim action stoped but didn't set trimInProcess back, then the worker won't trigger a new trim and pause push data, already pushed data(replicate data can work well) won't release, then won't recover receive push data request.
    
    This pr make the logic more robust
    
    ### Why are the changes needed?
    Make logic more robust
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No
    
    Closes #2224 from AngersZhuuuu/CELEBORN-1220.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: mingji <fe...@alibaba-inc.com>
---
 .../service/deploy/worker/memory/MemoryManager.java        | 14 ++++++++++----
 .../service/deploy/worker/storage/StorageManager.scala     |  5 +++--
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index e04975ec8..71861f17a 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -317,10 +317,16 @@ public class MemoryManager {
     if (trimInProcess.compareAndSet(false, true)) {
       actionService.submit(
           () -> {
-            // In current code, StorageManager will add into this before ChannelsLimiter,
-            // so all behaviors of StorageManger will execute before ChannelsLimiter.
-            memoryPressureListeners.forEach(MemoryPressureListener::onTrim);
-            trimInProcess.set(false);
+            try {
+              // In current code, StorageManager will add into this before ChannelsLimiter,
+              // so all behaviors of StorageManger will execute before ChannelsLimiter.
+              memoryPressureListeners.forEach(MemoryPressureListener::onTrim);
+            } finally {
+              // MemoryManager uses this flag to avoid parallel trigger trim action,
+              // We should make sure set this value back, otherwise it won't trigger trim action
+              // again.
+              trimInProcess.set(false);
+            }
           });
     }
   }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index d90b29f04..6fcc98050 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -696,11 +696,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
 
   override def onTrim(): Unit = {
     logInfo(s"Trigger ${this.getClass.getCanonicalName} trim action")
-    flushFileWriters()
     try {
+      flushFileWriters()
       Thread.sleep(conf.workerDirectMemoryTrimFlushWaitInterval)
     } catch {
-      case _: Exception => // Do nothing
+      case e: Exception =>
+        logError(s"Trigger ${this.getClass.getCanonicalName} trim failed.", e)
     }
   }