You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/12/27 12:41:01 UTC

(incubator-celeborn) branch branch-0.4 updated: [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 2897a936f [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task
2897a936f is described below

commit 2897a936f5ac0e94e59ef3fdb36954b46b44aa38
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Dec 27 20:40:40 2023 +0800

    [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task
    
    ### What changes were proposed in this pull request?
    
    MapperAttempts for a shuffle replies the `MAP_ENDED` when mapper has already been ended for receving push data or push merged data from speculative task.
    
    Follow up #1591.
    
    ### Why are the changes needed?
    
    When mapper has already been ended for receving push data or push merged data from speculative task, `PushDataHandler` should trigger MapEnd instead of StageEnd for worker. Meanwhile, the `ShuffleClientImpl` should handle `STAGE_ENDED` as MapEnd, otherwise causes that other tasks of the stage could not send shuffle data for data lost.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal test.
    
    Closes #2190 from SteNicholas/CELEBORN-678.
    
    Authored-by: SteNicholas <pr...@163.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
    (cherry picked from commit 3097ffe33b04c89775357865d496476f4ca89d9c)
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../java/org/apache/celeborn/client/ShuffleClientImpl.java   | 12 ++++++++----
 .../celeborn/service/deploy/worker/PushDataHandler.scala     |  4 ++--
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index bc60c9fd1..1f2c93f8c 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -954,8 +954,10 @@ public class ShuffleClientImpl extends ShuffleClient {
           new RpcResponseCallback() {
             @Override
             public void onSuccess(ByteBuffer response) {
-              if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
-                stageEndShuffleSet.add(shuffleId);
+              if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
+                mapperEndMap
+                    .computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
+                    .add(mapId);
               }
               logger.debug(
                   "Push data to {} success for shuffle {} map {} attempt {} partition {} batch {}.",
@@ -1350,8 +1352,10 @@ public class ShuffleClientImpl extends ShuffleClient {
                 groupedBatchId,
                 Arrays.toString(batchIds));
             pushState.removeBatch(groupedBatchId, hostPort);
-            if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
-              stageEndShuffleSet.add(shuffleId);
+            if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
+              mapperEndMap
+                  .computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
+                  .add(mapId);
             }
           }
 
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 9beca4c00..134fcab86 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -205,7 +205,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
           logInfo(
             s"[Case1] Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
               s" attempt $attemptId), but this mapper has already been ended.")
-          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
         } else {
           logInfo(
             s"Receive push data for committed hard split partition of (shuffle $shuffleKey, " +
@@ -470,7 +470,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
               s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId), " +
               s"but this mapper has already been ended.")
             callbackWithTimer.onSuccess(
-              ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+              ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
           } else {
             logInfo(s"[Case1] Receive push merged data for committed hard split partition of " +
               s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")