You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by an...@apache.org on 2023/01/30 09:47:27 UTC
[incubator-celeborn] branch main updated: [CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)
This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1311fb53 [CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)
1311fb53 is described below
commit 1311fb53d1c959949007ee3751bf866457a1b74f
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 30 17:47:22 2023 +0800
[CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)
* [CELEBORN-243][IMPROVEMENT] Create push client failed should have a ERROR type
---
.../apache/celeborn/client/ShuffleClientImpl.java | 74 ++++++++++++++++++----
.../apache/celeborn/client/LifecycleManager.scala | 22 ++++++-
.../common/protocol/message/StatusCode.java | 14 +++-
.../org/apache/celeborn/common/util/Utils.scala | 8 +++
.../service/deploy/worker/PushDataHandler.scala | 66 +++++++++++++------
5 files changed, 150 insertions(+), 34 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1de8840e..38cf7b68 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -739,12 +739,22 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void onFailure(Throwable e) {
+ StatusCode cause = getPushDataFailCause(e.getMessage());
+
if (pushState.exception.get() != null) {
return;
}
if (remainReviveTimes <= 0) {
- callback.onFailure(e);
+ callback.onFailure(
+ new Exception(
+ cause.getMessage()
+ + "! Push data to master worker of "
+ + loc
+ + " failed: "
+ + e.getMessage(),
+ e));
+
return;
}
@@ -771,7 +781,7 @@ public class ShuffleClientImpl extends ShuffleClient {
loc,
this,
pushState,
- getPushDataFailCause(e.getMessage()),
+ cause,
remainReviveTimes));
} else {
pushState.removeBatch(nextBatchId, loc.hostAndPushPort());
@@ -793,12 +803,20 @@ public class ShuffleClientImpl extends ShuffleClient {
ChannelFuture future = client.pushData(pushData, wrappedCallback);
pushState.pushStarted(nextBatchId, future, wrappedCallback, loc.hostAndPushPort());
} else {
- throw new RuntimeException("Mock push data first time failed.");
+ wrappedCallback.onFailure(
+ new Exception(
+ StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.toString(),
+ new RuntimeException("Mock push data first time failed.")));
}
} catch (Exception e) {
logger.warn("PushData failed", e);
wrappedCallback.onFailure(
- new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+ new Exception(
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+ + "! "
+ + e.getMessage()
+ + ". "
+ + loc));
}
} else {
// add batch data
@@ -1079,11 +1097,19 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void onFailure(Throwable e) {
+ StatusCode cause = getPushDataFailCause(e.getMessage());
if (pushState.exception.get() != null) {
return;
}
if (remainReviveTimes <= 0) {
- callback.onFailure(e);
+ callback.onFailure(
+ new Exception(
+ cause.getMessage()
+ + "! Push data to master worker of "
+ + hostPort
+ + " failed: "
+ + e.getMessage(),
+ e));
return;
}
logger.error(
@@ -1109,7 +1135,7 @@ public class ShuffleClientImpl extends ShuffleClient {
mapId,
attemptId,
batches,
- getPushDataFailCause(e.getMessage()),
+ cause,
groupedBatchId,
remainReviveTimes - 1));
}
@@ -1123,11 +1149,20 @@ public class ShuffleClientImpl extends ShuffleClient {
ChannelFuture future = client.pushMergedData(mergedData, wrappedCallback);
pushState.pushStarted(groupedBatchId, future, wrappedCallback, hostPort);
} else {
- throw new RuntimeException("Mock push merge data failed");
+ wrappedCallback.onFailure(
+ new Exception(
+ StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.toString(),
+ new RuntimeException("Mock push merge data failed.")));
}
} catch (Exception e) {
logger.warn("PushMergedData failed", e);
- wrappedCallback.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+ wrappedCallback.onFailure(
+ new Exception(
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+ + "! "
+ + e.getMessage()
+ + ". "
+ + hostPort));
}
}
@@ -1382,11 +1417,22 @@ public class ShuffleClientImpl extends ShuffleClient {
StatusCode cause;
if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) {
cause = StatusCode.PUSH_DATA_FAIL_SLAVE;
- } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())
- || connectFail(message)) {
+ } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())) {
cause = StatusCode.PUSH_DATA_FAIL_MASTER;
+ } else if (message.startsWith(
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage())) {
+ cause = StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER;
+ } else if (message.startsWith(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage())) {
+ cause = StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE;
+ } else if (message.startsWith(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER.getMessage())) {
+ cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER;
+ } else if (message.startsWith(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage())) {
+ cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE;
} else if (message.startsWith(StatusCode.PUSH_DATA_TIMEOUT.getMessage())) {
cause = StatusCode.PUSH_DATA_TIMEOUT;
+ } else if (connectFail(message)) {
+ // Throw when push to master worker connection causeException.
+ cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER;
} else {
cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
}
@@ -1516,7 +1562,13 @@ public class ShuffleClientImpl extends ShuffleClient {
pushState.pushStarted(nextBatchId, future, callback, location.hostAndPushPort());
} catch (Exception e) {
logger.warn("PushData byteBuf failed", e);
- callback.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+ callback.onFailure(
+ new Exception(
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+ + "! "
+ + e.getMessage()
+ + ". "
+ + location));
}
return totalLength;
}
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 073b15bb..1f1fda0b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -533,6 +533,20 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
case StatusCode.PUSH_DATA_FAIL_SLAVE
if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
blacklistPartitionWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_FAIL_SLAVE)
+ case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
+ blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
+ case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+ if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+ blacklistPartitionWorker(
+ oldPartition.getPeer,
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
+ case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
+ blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
+ case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
+ if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+ blacklistPartitionWorker(
+ oldPartition.getPeer,
+ StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
case _ =>
}
}
@@ -1071,7 +1085,13 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
.filter { case (_, entry) =>
val (statusCode, registerTime) = entry
statusCode match {
- case StatusCode.WORKER_SHUTDOWN | StatusCode.NO_AVAILABLE_WORKING_DIR | StatusCode.RESERVE_SLOTS_FAILED
+ case StatusCode.WORKER_SHUTDOWN |
+ StatusCode.NO_AVAILABLE_WORKING_DIR |
+ StatusCode.RESERVE_SLOTS_FAILED |
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER |
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE |
+ StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER |
+ StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
if current - registerTime < workerExcludedExpireTimeout =>
true
case StatusCode.UNKNOWN_WORKER => true
diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 75068a87..bdea3983 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -68,7 +68,11 @@ public enum StatusCode {
REGION_FINISH_FAIL_SLAVE(36),
REGION_FINISH_FAIL_MASTER(37),
- PUSH_DATA_TIMEOUT(38);
+ PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER(38),
+ PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE(39),
+ PUSH_DATA_CONNECTION_EXCEPTION_MASTER(40),
+ PUSH_DATA_CONNECTION_EXCEPTION_SLAVE(41),
+ PUSH_DATA_TIMEOUT(42);
private final byte value;
@@ -87,6 +91,14 @@ public enum StatusCode {
msg = "PushDataFailMaster";
} else if (value == PUSH_DATA_FAIL_SLAVE.getValue()) {
msg = "PushDataFailSlave";
+ } else if (value == PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getValue()) {
+ msg = "PushDataCreateConnectionFailMaster";
+ } else if (value == PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getValue()) {
+ msg = "PushDataCreateConnectionFailSlave";
+ } else if (value == PUSH_DATA_CONNECTION_EXCEPTION_MASTER.getValue()) {
+ msg = "PushDataConnectionExceptionMaster";
+ } else if (value == PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getValue()) {
+ msg = "PushDataConnectionExceptionSlave";
} else if (value == PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.getValue()) {
msg = "PushDataFailNonCriticalCause";
} else if (value == PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getValue()) {
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 875b75fd..31a716dd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -894,6 +894,14 @@ object Utils extends Logging {
StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED
case 31 =>
StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED
+ case 38 =>
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER
+ case 39 =>
+ StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+ case 40 =>
+ StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER
+ case 41 =>
+ StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
case _ =>
null
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index cdcb426a..ccc254c5 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -171,8 +171,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
override def onFailure(e: Throwable): Unit = {
logError(s"[handlePushData.onFailure] partitionLocation: $location", e)
workerSource.incCounter(WorkerSource.PushDataFailCount)
- callback.onFailure(
- new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
+ // Throw by slave peer worker
+ if (e.getMessage.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage)) {
+ callback.onFailure(e)
+ } else {
+ // Throw by connection
+ callback.onFailure(new Exception(
+ s"${StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage}! Push data to peer of $location failed: ${e.getMessage}",
+ e))
+ }
}
}
@@ -268,8 +275,10 @@ class PushDataHandler extends BaseMessageHandler with Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
- wrappedCallback.onFailure(
- new Exception(s"Peer $peerWorker unavailable for $location!"))
+ workerSource.incCounter(WorkerSource.PushDataFailCount)
+ callback.onFailure(
+ new Exception(
+ s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Peer $peerWorker unavailable for $location!"))
return
}
try {
@@ -285,8 +294,11 @@ class PushDataHandler extends BaseMessageHandler with Logging {
case e: Exception =>
pushData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
- wrappedCallback.onFailure(
- new Exception(s"Push data to peer $peerWorker failed for $location", e))
+ workerSource.incCounter(WorkerSource.PushDataFailCount)
+ callback.onFailure(
+ new Exception(
+ s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Create connection to peer $peerWorker failed for $location",
+ e))
}
}
})
@@ -334,6 +346,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
return
}
+ val locations = pushMergedData.partitionUniqueIds.map { id =>
+ if (isMaster) {
+ partitionLocationInfo.getMasterLocation(shuffleKey, id)
+ } else {
+ partitionLocationInfo.getSlaveLocation(shuffleKey, id)
+ }
+ }
+
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
if (isMaster) {
@@ -354,19 +374,20 @@ class PushDataHandler extends BaseMessageHandler with Logging {
override def onFailure(e: Throwable): Unit = {
workerSource.incCounter(WorkerSource.PushDataFailCount)
- callback.onFailure(
- new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
+ // Throw by slave peer worker
+ if (e.getMessage.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage)) {
+ callback.onFailure(e)
+ } else {
+ // Throw by connection
+ callback.onFailure(new Exception(
+ s"${StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage}! Push data to peer of ${locations.head} failed: ${e.getMessage}",
+ e))
+ }
}
}
// find FileWriters responsible for the data
- val locations = pushMergedData.partitionUniqueIds.map { id =>
- val loc =
- if (isMaster) {
- partitionLocationInfo.getMasterLocation(shuffleKey, id)
- } else {
- partitionLocationInfo.getSlaveLocation(shuffleKey, id)
- }
+ locations.foreach { loc =>
if (loc == null) {
val (mapId, attemptId) = getMapAttempt(body)
// MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
@@ -397,7 +418,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
} else {
val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId," +
- s" attempt $attemptId, uniqueId $id)."
+ s" attempt $attemptId, uniqueId ${loc.getUniqueId})."
logWarning(s"[handlePushMergedData] $msg")
callback.onFailure(
new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
@@ -405,7 +426,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
return
}
- loc
}
// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
@@ -447,7 +467,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
- wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable for $location!"))
+ workerSource.incCounter(WorkerSource.PushDataFailCount)
+ callback.onFailure(new Exception(
+ s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Peer $peerWorker unavailable for $location!"))
return
}
try {
@@ -466,9 +488,11 @@ class PushDataHandler extends BaseMessageHandler with Logging {
case e: Exception =>
pushMergedData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
- wrappedCallback.onFailure(new Exception(
- s"Push data to peer $peerWorker failed for $location",
- e))
+ workerSource.incCounter(WorkerSource.PushDataFailCount)
+ callback.onFailure(
+ new Exception(
+ s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Create connection to peer $peerWorker failed for $location",
+ e))
}
}
})