You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/23 12:05:41 UTC

[GitHub] [incubator-celeborn] zhongqiangczq opened a new pull request, #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

zhongqiangczq opened a new pull request, #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002

   … handshake/regionstart/regionfinish
   
   # [BUG]/[FEATURE] title
   
   ### What changes were proposed in this pull request?
   
   
   ### Why are the changes needed?
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @related-reviewer
   @RexXiong @waitinfuture @FMX 
   /assign @main-reviewer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#discussion_r1036117279


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,233 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());

Review Comment:
   useless toString()



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,233 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionStart regionStart =
+              new RegionStart(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  currentRegionIdx,
+                  isBroadcast);
+          ByteBuffer regionStartResponse =
+              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          if (regionStartResponse.hasRemaining()
+              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
+            // if split then revive
+            PbChangeLocationResponse response =
+                driverRssMetaService.askSync(
+                    ControlMessages.Revive$.MODULE$.apply(
+                        applicationId,
+                        shuffleId,
+                        mapId,
+                        attemptId,
+                        location.getId(),
+                        location.getEpoch(),
+                        location,
+                        StatusCode.HARD_SPLIT),
+                    ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
+            // per partitionKey only serve single PartitionLocation in Client Cache.
+            StatusCode respStatus = Utils.toStatusCode(response.getStatus());
+            if (StatusCode.SUCCESS.equals(respStatus)) {
+              return Optional.of(PbSerDeUtils.fromPbPartitionLocation(response.getLocation()));
+            } else if (StatusCode.MAP_ENDED.equals(respStatus)) {
+              final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+              mapperEndMap
+                  .computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
+                  .add(mapKey);
+              return Optional.empty();
+            } else {
+              // throw exception
+              logger.error(
+                  "Exception raised while reviving for shuffle {} reduce {} epoch {}.",
+                  shuffleId,
+                  location.getId(),
+                  location.getEpoch());
+              throw new IOException("regiontstart revive failed");
+            }
+          }
+          return Optional.empty();
+        });
+  }
+
+  @Override
+  public void regionFinish(
+      String applicationId, int shuffleId, int mapId, int attemptId, PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionFinish shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionFinish location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionFinish regionFinish =
+              new RegionFinish(MASTER_MODE, shuffleKey, location.getUniqueId(), attemptId);
+          client.sendRpcSync(regionFinish.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  private <R> R sendMessageInternal(
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      ThrowingExceptionSupplier<R, Exception> supplier)
+      throws IOException {
+    PushState pushState = null;
+    int batchId = 0;
+    try {
+      // mapKey
+      final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+      // return if shuffle stage already ended
+      if (mapperEnded(shuffleId, mapId, attemptId)) {
+        logger.debug(
+            "The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.",
+            shuffleId,
+            mapId,
+            attemptId);
+        return null;
+      }
+      pushState = pushStates.computeIfAbsent(mapKey, (s) -> new PushState(conf));
+      // check limit
+      limitMaxInFlight(mapKey, pushState, maxInFlight);

Review Comment:
   should we limit zero here?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,233 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#discussion_r1032165644


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,230 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionStart regionStart =
+              new RegionStart(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  currentRegionIdx,
+                  isBroadcast);
+          ByteBuffer regionStartResponse =
+              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          if (regionStartResponse.hasRemaining()
+              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
+            // if split then revive
+            PbChangeLocationResponse response =

Review Comment:
   use PartitionSplit request instead revive request to get a new location



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#discussion_r1035499447


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,230 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionStart regionStart =
+              new RegionStart(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  currentRegionIdx,
+                  isBroadcast);
+          ByteBuffer regionStartResponse =
+              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          if (regionStartResponse.hasRemaining()
+              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
+            // if split then revive
+            PbChangeLocationResponse response =

Review Comment:
   > for client, soft_split will use partitionsplit request , but for hard_split, it will use revive request. for lifecyclemanager, the processing logics are almost same for the partitionsplit and revive request
   
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq closed pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
zhongqiangczq closed pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…
URL: https://github.com/apache/incubator-celeborn/pull/1002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#discussion_r1032259717


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,230 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionStart regionStart =
+              new RegionStart(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  currentRegionIdx,
+                  isBroadcast);
+          ByteBuffer regionStartResponse =
+              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          if (regionStartResponse.hasRemaining()
+              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
+            // if split then revive
+            PbChangeLocationResponse response =

Review Comment:
   worker's diskfull will also send hard_split to client. in this case , it still use PartitionSplit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#discussion_r1032259717


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1205,4 +1203,230 @@ private boolean connectFail(String message) {
         || (message.equals("Connection reset by peer"))
         || (message.startsWith("Failed to send RPC "));
   }
+
+  @Override
+  public void pushDataHandShake(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numPartitions,
+      int bufferSize,
+      PartitionLocation location)
+      throws IOException {
+    sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("pushDataHandShake location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          PushDataHandShake handShake =
+              new PushDataHandShake(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  numPartitions,
+                  bufferSize);
+          client.sendRpcSync(handShake.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          return null;
+        });
+  }
+
+  @Override
+  public Optional<PartitionLocation> regionStart(
+      String applicationId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      PartitionLocation location,
+      int currentRegionIdx,
+      boolean isBroadcast)
+      throws IOException {
+    return sendMessageInternal(
+        shuffleId,
+        mapId,
+        attemptId,
+        location,
+        () -> {
+          String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
+          logger.info(
+              "regionStart shuffleKey:{}, attemptId:{}, locationId:{}",
+              shuffleKey,
+              attemptId,
+              location.getUniqueId());
+          logger.debug("regionStart location:{}", location.toString());
+          TransportClient client =
+              dataClientFactory.createClient(location.getHost(), location.getPushPort());
+          RegionStart regionStart =
+              new RegionStart(
+                  MASTER_MODE,
+                  shuffleKey,
+                  location.getUniqueId(),
+                  attemptId,
+                  currentRegionIdx,
+                  isBroadcast);
+          ByteBuffer regionStartResponse =
+              client.sendRpcSync(regionStart.toByteBuffer(), conf.pushDataRpcTimeoutMs());
+          if (regionStartResponse.hasRemaining()
+              && regionStartResponse.get() == StatusCode.HARD_SPLIT.getValue()) {
+            // if split then revive
+            PbChangeLocationResponse response =

Review Comment:
   for client, soft_split will use partitionsplit request , but for hard_split, it will use revive request. 
   for lifecyclemanager, the processing logics are almost same for the partitionsplit and revive request
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#issuecomment-1332368803

   plz resolve conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1002: [CELEBORN-11] ShuffleClient supports MapPartition shuffle write: send…

Posted by GitBox <gi...@apache.org>.
RexXiong commented on PR #1002:
URL: https://github.com/apache/incubator-celeborn/pull/1002#issuecomment-1331608160

   kindly remind. for graceful worker shutdown/upgrade, worker will just return hard_split when handle push data message, So we may need wait a timeout if map partition region is not finished. @zhongqiangczq @waitinfuture 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org