You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by an...@apache.org on 2023/01/28 10:41:58 UTC
[incubator-celeborn] branch main updated: [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)
This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8611a644 [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)
8611a644 is described below
commit 8611a6440050553d6d4b0bb410daacc90c7297e3
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Sat Jan 28 18:41:54 2023 +0800
[CELEBORN-237][IMPROVEMENT] push failed error message should show partition info (#1178)
* [CELEBORN-237][IMPROVEMENT] push failed error message should show partition info
---
.../apache/celeborn/client/ShuffleClientImpl.java | 6 +++---
.../service/deploy/worker/PushDataHandler.scala | 24 ++++++++++++++--------
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 063fdc40..1de8840e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1380,12 +1380,12 @@ public class ShuffleClientImpl extends ShuffleClient {
private StatusCode getPushDataFailCause(String message) {
logger.info("[getPushDataFailCause] message: " + message);
StatusCode cause;
- if (StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage().equals(message)) {
+ if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) {
cause = StatusCode.PUSH_DATA_FAIL_SLAVE;
- } else if (StatusCode.PUSH_DATA_FAIL_MASTER.getMessage().equals(message)
+ } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())
|| connectFail(message)) {
cause = StatusCode.PUSH_DATA_FAIL_MASTER;
- } else if (StatusCode.PUSH_DATA_TIMEOUT.getMessage().equals(message)) {
+ } else if (message.startsWith(StatusCode.PUSH_DATA_TIMEOUT.getMessage())) {
cause = StatusCode.PUSH_DATA_TIMEOUT;
} else {
cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 7c3c96a7..cdcb426a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -169,9 +169,10 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
override def onFailure(e: Throwable): Unit = {
- logError(s"[handlePushData.onFailure] partitionLocation: $location")
+ logError(s"[handlePushData.onFailure] partitionLocation: $location", e)
workerSource.incCounter(WorkerSource.PushDataFailCount)
- callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage(), e))
+ callback.onFailure(
+ new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
}
}
@@ -231,7 +232,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
} else {
StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()
}
- callback.onFailure(new Exception(message, exception))
+ callback.onFailure(new Exception(s"$message! $location", exception))
return
}
val diskFull =
@@ -267,7 +268,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
- wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable!"))
+ wrappedCallback.onFailure(
+ new Exception(s"Peer $peerWorker unavailable for $location!"))
return
}
try {
@@ -283,7 +285,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
case e: Exception =>
pushData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
- wrappedCallback.onFailure(e)
+ wrappedCallback.onFailure(
+ new Exception(s"Push data to peer $peerWorker failed for $location", e))
}
}
})
@@ -351,7 +354,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
override def onFailure(e: Throwable): Unit = {
workerSource.incCounter(WorkerSource.PushDataFailCount)
- callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+ callback.onFailure(
+ new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
}
}
@@ -423,7 +427,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
} else {
StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()
}
- callback.onFailure(new Exception(message, exception))
+ callback.onFailure(new Exception(s"$message! ${locations.head}", exception))
return
}
fileWriters.foreach(_.incrementPendingWrites())
@@ -443,7 +447,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
- wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable!"))
+ wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable for $location!"))
return
}
try {
@@ -462,7 +466,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
case e: Exception =>
pushMergedData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
- wrappedCallback.onFailure(e)
+ wrappedCallback.onFailure(new Exception(
+ s"Push data to peer $peerWorker failed for $location",
+ e))
}
}
})