You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/26 12:56:42 UTC

[incubator-celeborn] branch main updated: [CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 817eee96 [CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
817eee96 is described below

commit 817eee969f2a7ce5ebcee55efa3525cf6b84912d
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Sat Nov 26 20:56:39 2022 +0800

    [CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
---
 .../main/scala/org/apache/celeborn/client/LifecycleManager.scala  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c28621bf..139c4503 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1195,6 +1195,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       shuffleId: Int,
       slots: WorkerResource): util.List[WorkerInfo] = {
     val reserveSlotFailedWorkers = new ConcurrentHashMap[WorkerInfo, (StatusCode, Long)]()
+    val failureInfos = new util.concurrent.CopyOnWriteArrayList[String]()
     val parallelism = Math.min(Math.max(1, slots.size()), conf.rpcMaxParallelism)
     ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) {
       case (workerInfo, (masterLocations, slaveLocations)) =>
@@ -1215,13 +1216,16 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
             s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +
             s" from worker ${workerInfo.readableAddress()}.")
         } else {
-          logError(s"[reserveSlots] Failed to" +
+          failureInfos.add(s"[reserveSlots] Failed to" +
             s" reserve buffers for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +
             s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}")
           reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis()))
         }
     }
-
+    if (failureInfos.asScala.nonEmpty) {
+      logError(s"Aggregated error of reserveSlots failure:${failureInfos.asScala.foldLeft("")(
+        (x, y) => s"$x \n $y")}")
+    }
     recordWorkerFailure(reserveSlotFailedWorkers)
     new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
   }