You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/16 03:55:48 UTC

[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

FMX commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023470930


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
+      int shuffleId,
+      int numMappers,
+      int numPartitions,
+      Callable<PbRegisterShuffleResponse> callable) {

Review Comment:
   I think this might be replaced by PbRegisterShuffleResponse. There is no need to add a Callable object here.



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(

Review Comment:
   Maybe registerShuffleInternal ? Register shuffle methods become confusing.



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(

Review Comment:
   Can it be merged into registerMapPartitionTask by adding some parameter?



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -379,6 +403,34 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       shuffleId: Int,
       numMappers: Int,
       numReducers: Int): Unit = {
+    handleOfferAndReserveSlots(context, applicationId, shuffleId, numMappers, numReducers)
+  }
+
+  private def handleRegisterMapPartitionTask(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      numMappers: Int,
+      attemptId: Int,

Review Comment:
   Unused. Maybe it can be delete.



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -53,7 +54,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
   private val pushReplicateEnabled = conf.pushReplicateEnabled
   private val partitionSplitThreshold = conf.partitionSplitThreshold
   private val partitionSplitMode = conf.partitionSplitMode
-  private val partitionType = conf.shufflePartitionType
+  // shuffle id -> partition type
+  private val shufflePartitionType = new ConcurrentHashMap[Int, PartitionType]()

Review Comment:
   This is for the future? Will an application have both map partition and reduce partition?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -110,6 +111,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
     return hdfsFs;
   }
 
+  public abstract PartitionLocation registerMapPartitionTask(

Review Comment:
   Looks like there is no place using the shuffle client instance directly, why should this method be added here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org