You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2024/01/15 06:22:20 UTC
(incubator-celeborn) branch main updated: [CELEBORN-1220][IMPROVEMENT] Make trim logic more robust
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cbdabf4ae [CELEBORN-1220][IMPROVEMENT] Make trim logic more robust
cbdabf4ae is described below
commit cbdabf4ae6fb9a9741ac752f5ab08bf4646227f6
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 15 14:22:03 2024 +0800
[CELEBORN-1220][IMPROVEMENT] Make trim logic more robust
### What changes were proposed in this pull request?
We meet a case that trim action stoped but didn't set trimInProcess back, then the worker won't trigger a new trim and pause push data, already pushed data(replicate data can work well) won't release, then won't recover receive push data request.
This pr make the logic more robust
### Why are the changes needed?
Make logic more robust
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No
Closes #2224 from AngersZhuuuu/CELEBORN-1220.
Authored-by: Angerszhuuuu <an...@gmail.com>
Signed-off-by: mingji <fe...@alibaba-inc.com>
---
.../service/deploy/worker/memory/MemoryManager.java | 14 ++++++++++----
.../service/deploy/worker/storage/StorageManager.scala | 5 +++--
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index e04975ec8..71861f17a 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -317,10 +317,16 @@ public class MemoryManager {
if (trimInProcess.compareAndSet(false, true)) {
actionService.submit(
() -> {
- // In current code, StorageManager will add into this before ChannelsLimiter,
- // so all behaviors of StorageManger will execute before ChannelsLimiter.
- memoryPressureListeners.forEach(MemoryPressureListener::onTrim);
- trimInProcess.set(false);
+ try {
+ // In current code, StorageManager will add into this before ChannelsLimiter,
+ // so all behaviors of StorageManger will execute before ChannelsLimiter.
+ memoryPressureListeners.forEach(MemoryPressureListener::onTrim);
+ } finally {
+ // MemoryManager uses this flag to avoid parallel trigger trim action,
+ // We should make sure set this value back, otherwise it won't trigger trim action
+ // again.
+ trimInProcess.set(false);
+ }
});
}
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index d90b29f04..6fcc98050 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -696,11 +696,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
override def onTrim(): Unit = {
logInfo(s"Trigger ${this.getClass.getCanonicalName} trim action")
- flushFileWriters()
try {
+ flushFileWriters()
Thread.sleep(conf.workerDirectMemoryTrimFlushWaitInterval)
} catch {
- case _: Exception => // Do nothing
+ case e: Exception =>
+ logError(s"Trigger ${this.getClass.getCanonicalName} trim failed.", e)
}
}