You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/22 06:29:59 UTC

[incubator-celeborn] branch main updated: [ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ec278f9 [ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)
5ec278f9 is described below

commit 5ec278f99a63dce5349f9d9cb1bd08c02197f327
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Nov 22 14:29:55 2022 +0800

    [ISSUE-987][FEATURE] During worker shutdown, return HARD_SPLIT for all existed partition (#988)
---
 .../apache/celeborn/client/ShuffleClientImpl.java  | 32 +++++++++++++++++++++-
 .../service/deploy/worker/PushDataHandler.scala    | 18 ++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1283059d..751ac07f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -942,7 +942,37 @@ public class ShuffleClientImpl extends ShuffleClient {
         new RpcResponseCallback() {
           @Override
           public void onSuccess(ByteBuffer response) {
-            callback.onSuccess(response);
+            if (response.remaining() > 0) {
+              byte reason = response.get();
+              if (reason == StatusCode.HARD_SPLIT.getValue()) {
+                logger.info(
+                    "Push merged data return hard split for map "
+                        + mapId
+                        + " attempt "
+                        + attemptId
+                        + " batches "
+                        + Arrays.toString(batchIds)
+                        + ".");
+                pushDataRetryPool.submit(
+                    () ->
+                        submitRetryPushMergedData(
+                            pushState,
+                            applicationId,
+                            shuffleId,
+                            mapId,
+                            attemptId,
+                            batches,
+                            StatusCode.HARD_SPLIT,
+                            groupedBatchId));
+              } else {
+                // Should not happen in current architecture.
+                response.rewind();
+                logger.error("Push merged data should not receive this response");
+                callback.onSuccess(response);
+              }
+            } else {
+              callback.onSuccess(response);
+            }
           }
 
           @Override
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index df5ac743..02dd6a71 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -52,6 +52,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
   var workerInfo: WorkerInfo = _
   var diskReserveSize: Long = _
   var partitionSplitMinimumSize: Long = _
+  var shutdown: AtomicBoolean = _
 
   def init(worker: Worker): Unit = {
     workerSource = worker.workerSource
@@ -65,6 +66,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.diskReserveSize
     partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
+    shutdown = worker.shutdown
 
     logInfo(s"diskReserveSize $diskReserveSize")
   }
@@ -168,6 +170,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       }
       return
     }
+
+    // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
+    // This should before return exception to make current push data can revive and retry.
+    if (isMaster && shutdown.get()) {
+      logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
+      callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+      return
+    }
+
     val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
     val exception = fileWriter.getException
     if (exception != null) {
@@ -318,6 +329,13 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       loc
     }
 
+    // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
+    // This should before return exception to make current push data can revive and retry.
+    if (isMaster && shutdown.get()) {
+      callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+      return
+    }
+
     val fileWriters = locations.map(_.asInstanceOf[WorkingPartition].getFileWriter)
     val fileWriterWithException = fileWriters.find(_.getException != null)
     if (fileWriterWithException.nonEmpty) {