You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/12/27 12:41:01 UTC
(incubator-celeborn) branch branch-0.4 updated: [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 2897a936f [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task
2897a936f is described below
commit 2897a936f5ac0e94e59ef3fdb36954b46b44aa38
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Dec 27 20:40:40 2023 +0800
[CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task
### What changes were proposed in this pull request?
MapperAttempts for a shuffle replies the `MAP_ENDED` when mapper has already been ended for receving push data or push merged data from speculative task.
Follow up #1591.
### Why are the changes needed?
When mapper has already been ended for receving push data or push merged data from speculative task, `PushDataHandler` should trigger MapEnd instead of StageEnd for worker. Meanwhile, the `ShuffleClientImpl` should handle `STAGE_ENDED` as MapEnd, otherwise causes that other tasks of the stage could not send shuffle data for data lost.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes #2190 from SteNicholas/CELEBORN-678.
Authored-by: SteNicholas <pr...@163.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
(cherry picked from commit 3097ffe33b04c89775357865d496476f4ca89d9c)
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 12 ++++++++----
.../celeborn/service/deploy/worker/PushDataHandler.scala | 4 ++--
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index bc60c9fd1..1f2c93f8c 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -954,8 +954,10 @@ public class ShuffleClientImpl extends ShuffleClient {
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
- stageEndShuffleSet.add(shuffleId);
+ if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
+ mapperEndMap
+ .computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
+ .add(mapId);
}
logger.debug(
"Push data to {} success for shuffle {} map {} attempt {} partition {} batch {}.",
@@ -1350,8 +1352,10 @@ public class ShuffleClientImpl extends ShuffleClient {
groupedBatchId,
Arrays.toString(batchIds));
pushState.removeBatch(groupedBatchId, hostPort);
- if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
- stageEndShuffleSet.add(shuffleId);
+ if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
+ mapperEndMap
+ .computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
+ .add(mapId);
}
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 9beca4c00..134fcab86 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -205,7 +205,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
logInfo(
s"[Case1] Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
s" attempt $attemptId), but this mapper has already been ended.")
- callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+ callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
} else {
logInfo(
s"Receive push data for committed hard split partition of (shuffle $shuffleKey, " +
@@ -470,7 +470,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId), " +
s"but this mapper has already been ended.")
callbackWithTimer.onSuccess(
- ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+ ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
} else {
logInfo(s"[Case1] Receive push merged data for committed hard split partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")