You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by an...@apache.org on 2023/01/28 10:41:58 UTC

[incubator-celeborn] branch main updated: [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)

This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8611a644 [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)
8611a644 is described below

commit 8611a6440050553d6d4b0bb410daacc90c7297e3
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Sat Jan 28 18:41:54 2023 +0800

    [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)
    
    * [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info
---
 .../apache/celeborn/client/ShuffleClientImpl.java  |  6 +++---
 .../service/deploy/worker/PushDataHandler.scala    | 24 ++++++++++++++--------
 2 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 063fdc40..1de8840e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1380,12 +1380,12 @@ public class ShuffleClientImpl extends ShuffleClient {
   private StatusCode getPushDataFailCause(String message) {
     logger.info("[getPushDataFailCause] message: " + message);
     StatusCode cause;
-    if (StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage().equals(message)) {
+    if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) {
       cause = StatusCode.PUSH_DATA_FAIL_SLAVE;
-    } else if (StatusCode.PUSH_DATA_FAIL_MASTER.getMessage().equals(message)
+    } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())
         || connectFail(message)) {
       cause = StatusCode.PUSH_DATA_FAIL_MASTER;
-    } else if (StatusCode.PUSH_DATA_TIMEOUT.getMessage().equals(message)) {
+    } else if (message.startsWith(StatusCode.PUSH_DATA_TIMEOUT.getMessage())) {
       cause = StatusCode.PUSH_DATA_TIMEOUT;
     } else {
       cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 7c3c96a7..cdcb426a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -169,9 +169,10 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       }
 
       override def onFailure(e: Throwable): Unit = {
-        logError(s"[handlePushData.onFailure] partitionLocation: $location")
+        logError(s"[handlePushData.onFailure] partitionLocation: $location", e)
         workerSource.incCounter(WorkerSource.PushDataFailCount)
-        callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage(), e))
+        callback.onFailure(
+          new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
       }
     }
 
@@ -231,7 +232,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
         } else {
           StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()
         }
-      callback.onFailure(new Exception(message, exception))
+      callback.onFailure(new Exception(s"$message! $location", exception))
       return
     }
     val diskFull =
@@ -267,7 +268,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushData.body().release()
-            wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable!"))
+            wrappedCallback.onFailure(
+              new Exception(s"Peer $peerWorker unavailable for $location!"))
             return
           }
           try {
@@ -283,7 +285,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             case e: Exception =>
               pushData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              wrappedCallback.onFailure(e)
+              wrappedCallback.onFailure(
+                new Exception(s"Push data to peer $peerWorker failed for $location", e))
           }
         }
       })
@@ -351,7 +354,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
 
       override def onFailure(e: Throwable): Unit = {
         workerSource.incCounter(WorkerSource.PushDataFailCount)
-        callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+        callback.onFailure(
+          new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
       }
     }
 
@@ -423,7 +427,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
         } else {
           StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()
         }
-      callback.onFailure(new Exception(message, exception))
+      callback.onFailure(new Exception(s"$message! ${locations.head}", exception))
       return
     }
     fileWriters.foreach(_.incrementPendingWrites())
@@ -443,7 +447,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushMergedData.body().release()
-            wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable!"))
+            wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable for $location!"))
             return
           }
           try {
@@ -462,7 +466,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             case e: Exception =>
               pushMergedData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              wrappedCallback.onFailure(e)
+              wrappedCallback.onFailure(new Exception(
+                s"Push data to peer $peerWorker failed for $location",
+                e))
           }
         }
       })