You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/31 13:30:15 UTC
(incubator-celeborn) branch main updated: [CELEBORN-1098] Logging worker address with worker failure log
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e02cde0a2 [CELEBORN-1098] Logging worker address with worker failure log
e02cde0a2 is described below
commit e02cde0a22e2511c55ba9757ff1fcee8463d5f14
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Oct 31 21:30:07 2023 +0800
[CELEBORN-1098] Logging worker address with worker failure log
### What changes were proposed in this pull request?
### Why are the changes needed?
At present, from the log, We don't know which worker's request timed out.
```java
23/10/30 15:44:51,963 [CommitFiles-ForkJoinPool-162-worker-1] ERROR ReducePartitionCommitHandler: AskSync CommitFiles for 0 failed (attempt 1/4).
org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after [60000 milliseconds]. This timeout is controlled by celeborn.rpc.askTimeout
at org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46)
at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61)
at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.celeborn.common.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
at org.apache.celeborn.common.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:73)
at org.apache.celeborn.client.commit.CommitHandler.requestCommitFilesWithRetry(CommitHandler.scala:417)
at org.apache.celeborn.client.commit.CommitHandler.commitFiles(CommitHandler.scala:279)
at org.apache.celeborn.client.CommitManager$$anon$1$$anon$2.$anonfun$run$2(CommitManager.scala:151)
at org.apache.celeborn.client.CommitManager$$anon$1$$anon$2.$anonfun$run$2$adapted(CommitManager.scala:122)
at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:293)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:225)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
... 19 more
```
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2054 from cxzl25/CELEBORN-1098.
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
.../main/scala/org/apache/celeborn/client/LifecycleManager.scala | 9 ++++++---
.../scala/org/apache/celeborn/client/commit/CommitHandler.scala | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 515269896..ebc892815 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1053,8 +1053,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
endpoint.askSync[ReserveSlotsResponse](message, conf.clientRpcReserveSlotsRpcTimeout)
} catch {
case e: Exception =>
- val msg = s"Exception when askSync ReserveSlots for $shuffleKey " +
- s"on worker $endpoint."
+ val msg =
+ s"Exception when askSync worker(${endpoint.address}) ReserveSlots for $shuffleKey " +
+ s"on worker $endpoint."
logError(msg, e)
ReserveSlotsResponse(StatusCode.REQUEST_FAILED, msg + s" ${e.getMessage}")
}
@@ -1067,7 +1068,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
endpoint.askSync[DestroyWorkerSlotsResponse](message)
} catch {
case e: Exception =>
- logError(s"AskSync Destroy for ${message.shuffleKey} failed.", e)
+ logError(
+ s"AskSync worker(${endpoint.address}) Destroy for ${message.shuffleKey} failed.",
+ e)
DestroyWorkerSlotsResponse(
StatusCode.REQUEST_FAILED,
message.primaryLocations,
diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 03afdc2b7..947546416 100644
--- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -420,7 +420,7 @@ abstract class CommitHandler(
case e: Throwable =>
retryTimes += 1
logError(
- s"AskSync CommitFiles for ${message.shuffleId} failed (attempt $retryTimes/$maxRetries).",
+ s"AskSync worker(${endpoint.address}) CommitFiles for ${message.shuffleId} failed (attempt $retryTimes/$maxRetries).",
e)
}
}