You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/22 06:29:59 UTC
[incubator-celeborn] branch main updated: [ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5ec278f9 [ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)
5ec278f9 is described below
commit 5ec278f99a63dce5349f9d9cb1bd08c02197f327
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Nov 22 14:29:55 2022 +0800
[ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)
---
.../apache/celeborn/client/ShuffleClientImpl.java | 32 +++++++++++++++++++++-
.../service/deploy/worker/PushDataHandler.scala | 18 ++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1283059d..751ac07f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -942,7 +942,37 @@ public class ShuffleClientImpl extends ShuffleClient {
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- callback.onSuccess(response);
+ if (response.remaining() > 0) {
+ byte reason = response.get();
+ if (reason == StatusCode.HARD_SPLIT.getValue()) {
+ logger.info(
+ "Push merged data return hard split for map "
+ + mapId
+ + " attempt "
+ + attemptId
+ + " batches "
+ + Arrays.toString(batchIds)
+ + ".");
+ pushDataRetryPool.submit(
+ () ->
+ submitRetryPushMergedData(
+ pushState,
+ applicationId,
+ shuffleId,
+ mapId,
+ attemptId,
+ batches,
+ StatusCode.HARD_SPLIT,
+ groupedBatchId));
+ } else {
+ // Should not happen in current architecture.
+ response.rewind();
+ logger.error("Push merged data should not receive this response");
+ callback.onSuccess(response);
+ }
+ } else {
+ callback.onSuccess(response);
+ }
}
@Override
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index df5ac743..02dd6a71 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -52,6 +52,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
var workerInfo: WorkerInfo = _
var diskReserveSize: Long = _
var partitionSplitMinimumSize: Long = _
+ var shutdown: AtomicBoolean = _
def init(worker: Worker): Unit = {
workerSource = worker.workerSource
@@ -65,6 +66,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.diskReserveSize
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
+ shutdown = worker.shutdown
logInfo(s"diskReserveSize $diskReserveSize")
}
@@ -168,6 +170,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
return
}
+
+ // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
+ // This should before return exception to make current push data can revive and retry.
+ if (isMaster && shutdown.get()) {
+ logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
+ callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+ return
+ }
+
val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
val exception = fileWriter.getException
if (exception != null) {
@@ -318,6 +329,13 @@ class PushDataHandler extends BaseMessageHandler with Logging {
loc
}
+ // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
+ // This should before return exception to make current push data can revive and retry.
+ if (isMaster && shutdown.get()) {
+ callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+ return
+ }
+
val fileWriters = locations.map(_.asInstanceOf[WorkingPartition].getFileWriter)
val fileWriterWithException = fileWriters.find(_.getException != null)
if (fileWriterWithException.nonEmpty) {