You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/26 12:56:42 UTC
[incubator-celeborn] branch main updated: [CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 817eee96 [CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
817eee96 is described below
commit 817eee969f2a7ce5ebcee55efa3525cf6b84912d
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Sat Nov 26 20:56:39 2022 +0800
[CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
---
.../main/scala/org/apache/celeborn/client/LifecycleManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c28621bf..139c4503 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1195,6 +1195,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
shuffleId: Int,
slots: WorkerResource): util.List[WorkerInfo] = {
val reserveSlotFailedWorkers = new ConcurrentHashMap[WorkerInfo, (StatusCode, Long)]()
+ val failureInfos = new util.concurrent.CopyOnWriteArrayList[String]()
val parallelism = Math.min(Math.max(1, slots.size()), conf.rpcMaxParallelism)
ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) {
case (workerInfo, (masterLocations, slaveLocations)) =>
@@ -1215,13 +1216,16 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +
s" from worker ${workerInfo.readableAddress()}.")
} else {
- logError(s"[reserveSlots] Failed to" +
+ failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +
s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}")
reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis()))
}
}
-
+ if (failureInfos.asScala.nonEmpty) {
+ logError(s"Aggregated error of reserveSlots failure:${failureInfos.asScala.foldLeft("")(
+ (x, y) => s"$x \n $y")}")
+ }
recordWorkerFailure(reserveSlotFailedWorkers)
new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
}