You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/31 13:30:15 UTC

(incubator-celeborn) branch main updated: [CELEBORN-1098] Logging worker address with worker failure log

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e02cde0a2 [CELEBORN-1098] Logging worker address with worker failure log
e02cde0a2 is described below

commit e02cde0a22e2511c55ba9757ff1fcee8463d5f14
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Oct 31 21:30:07 2023 +0800

    [CELEBORN-1098] Logging worker address with worker failure log
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    At present, from the log, We don't know which worker's request timed out.
    
    ```java
    23/10/30 15:44:51,963 [CommitFiles-ForkJoinPool-162-worker-1] ERROR ReducePartitionCommitHandler: AskSync CommitFiles for 0 failed (attempt 1/4).
    org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after [60000 milliseconds]. This timeout is controlled by celeborn.rpc.askTimeout
            at org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46)
            at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61)
            at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57)
            at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
            at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at org.apache.celeborn.common.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
            at org.apache.celeborn.common.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:73)
            at org.apache.celeborn.client.commit.CommitHandler.requestCommitFilesWithRetry(CommitHandler.scala:417)
            at org.apache.celeborn.client.commit.CommitHandler.commitFiles(CommitHandler.scala:279)
            at org.apache.celeborn.client.CommitManager$$anon$1$$anon$2.$anonfun$run$2(CommitManager.scala:151)
            at org.apache.celeborn.client.CommitManager$$anon$1$$anon$2.$anonfun$run$2$adapted(CommitManager.scala:122)
            at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:293)
            at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
            at scala.util.Success.$anonfun$map$1(Try.scala:255)
            at scala.util.Success.map(Try.scala:213)
            at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
            at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
            at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
            at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
            at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
            at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
            at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
            at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:225)
            at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
            ... 19 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2054 from cxzl25/CELEBORN-1098.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../main/scala/org/apache/celeborn/client/LifecycleManager.scala | 9 ++++++---
 .../scala/org/apache/celeborn/client/commit/CommitHandler.scala  | 2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 515269896..ebc892815 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1053,8 +1053,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
       endpoint.askSync[ReserveSlotsResponse](message, conf.clientRpcReserveSlotsRpcTimeout)
     } catch {
       case e: Exception =>
-        val msg = s"Exception when askSync ReserveSlots for $shuffleKey " +
-          s"on worker $endpoint."
+        val msg =
+          s"Exception when askSync worker(${endpoint.address}) ReserveSlots for $shuffleKey " +
+            s"on worker $endpoint."
         logError(msg, e)
         ReserveSlotsResponse(StatusCode.REQUEST_FAILED, msg + s" ${e.getMessage}")
     }
@@ -1067,7 +1068,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
       endpoint.askSync[DestroyWorkerSlotsResponse](message)
     } catch {
       case e: Exception =>
-        logError(s"AskSync Destroy for ${message.shuffleKey} failed.", e)
+        logError(
+          s"AskSync worker(${endpoint.address}) Destroy for ${message.shuffleKey} failed.",
+          e)
         DestroyWorkerSlotsResponse(
           StatusCode.REQUEST_FAILED,
           message.primaryLocations,
diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 03afdc2b7..947546416 100644
--- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -420,7 +420,7 @@ abstract class CommitHandler(
         case e: Throwable =>
           retryTimes += 1
           logError(
-            s"AskSync CommitFiles for ${message.shuffleId} failed (attempt $retryTimes/$maxRetries).",
+            s"AskSync worker(${endpoint.address}) CommitFiles for ${message.shuffleId} failed (attempt $retryTimes/$maxRetries).",
             e)
       }
     }