You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/15 12:21:25 UTC

[GitHub] [incubator-celeborn] RexXiong opened a new pull request, #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

RexXiong opened a new pull request, #973:
URL: https://github.com/apache/incubator-celeborn/pull/973

   close #952 https://github.com/apache/incubator-celeborn/issues/952
   
   # [BUG]/[FEATURE] title
   
   ### What changes were proposed in this pull request?
   
   
   ### Why are the changes needed?
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @related-reviewer
   
   /assign @main-reviewer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023470930


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
+      int shuffleId,
+      int numMappers,
+      int numPartitions,
+      Callable<PbRegisterShuffleResponse> callable) {

Review Comment:
   I think this might be replaced by PbRegisterShuffleResponse. There is no need to add a Callable object here.



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(

Review Comment:
   Maybe registerShuffleInternal ? Register shuffle methods become confusing.



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(

Review Comment:
   Can it be merged into registerMapPartitionTask by adding some parameter?



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -379,6 +403,34 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       shuffleId: Int,
       numMappers: Int,
       numReducers: Int): Unit = {
+    handleOfferAndReserveSlots(context, applicationId, shuffleId, numMappers, numReducers)
+  }
+
+  private def handleRegisterMapPartitionTask(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      numMappers: Int,
+      attemptId: Int,

Review Comment:
   Unused. Maybe it can be delete.



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -53,7 +54,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
   private val pushReplicateEnabled = conf.pushReplicateEnabled
   private val partitionSplitThreshold = conf.partitionSplitThreshold
   private val partitionSplitMode = conf.partitionSplitMode
-  private val partitionType = conf.shufflePartitionType
+  // shuffle id -> partition type
+  private val shufflePartitionType = new ConcurrentHashMap[Int, PartitionType]()

Review Comment:
   This is for the future? Will an application have both map partition and reduce partition?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -110,6 +111,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
     return hdfsFs;
   }
 
+  public abstract PartitionLocation registerMapPartitionTask(

Review Comment:
   Looks like there is no place using the shuffle client instance directly, why should this method be added here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture merged pull request #973: [CELEBORN-8] [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
waitinfuture merged PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [CELEBORN-8] [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1024659142


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -394,7 +444,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
             .values()
             .asScala
             .flatMap(_.getAllMasterLocationsWithMinEpoch(shuffleId.toString).asScala)
-            .filter(_.getEpoch == 0)
+            .filter(p =>
+              (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || (partitionType == PartitionType.MAP && p.getId == partitionId))

Review Comment:
   No, but for map partition only one "partition-location-Id" for the map task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023483653


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(

Review Comment:
   It looks better way to separate these Scenario.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023485344


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -110,6 +111,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
     return hdfsFs;
   }
 
+  public abstract PartitionLocation registerMapPartitionTask(

Review Comment:
   VisableForTesting or Writer can use this directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023481199


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(

Review Comment:
   Good Advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023484255


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -53,7 +54,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
   private val pushReplicateEnabled = conf.pushReplicateEnabled
   private val partitionSplitThreshold = conf.partitionSplitThreshold
   private val partitionSplitMode = conf.partitionSplitMode
-  private val partitionType = conf.shufflePartitionType
+  // shuffle id -> partition type
+  private val shufflePartitionType = new ConcurrentHashMap[Int, PartitionType]()

Review Comment:
   I don't think we should limit this. May be add this in future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #973: [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1023481662


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -257,13 +259,58 @@ private String genAddressPair(PartitionLocation loc) {
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
       String appId, int shuffleId, int numMappers, int numPartitions) {
+    return registerShuffle(
+        shuffleId,
+        numMappers,
+        numMappers,
+        () ->
+            driverRssMetaService.askSync(
+                RegisterShuffle$.MODULE$.apply(appId, shuffleId, numMappers, numPartitions),
+                ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+  }
+
+  @Override
+  public PartitionLocation registerMapPartitionTask(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
+    int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+    logger.info(
+        "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
+        mapId,
+        attemptId,
+        partitionId);
+    if (attemptId == 0) {
+      return registerMapPartitionTaskWithFirstAttempt(
+          appId, shuffleId, numMappers, mapId, attemptId, partitionId);
+    }
+
+    // TODO
+    throw new UnsupportedOperationException("can not register shuffle task with attempt beyond 0");
+  }
+
+  private PartitionLocation registerMapPartitionTaskWithFirstAttempt(
+      String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
+    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
+        registerShuffle(
+            shuffleId,
+            numMappers,
+            numMappers,
+            () ->
+                driverRssMetaService.askSync(
+                    RegisterMapPartitionTask$.MODULE$.apply(
+                        appId, shuffleId, numMappers, mapId, attemptId, partitionId),
+                    ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
+    return partitionLocationMap.get(partitionId);
+  }
+
+  public ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
+      int shuffleId,
+      int numMappers,
+      int numPartitions,
+      Callable<PbRegisterShuffleResponse> callable) {

Review Comment:
   this registerShuffleInternal function will response for both registerShuffle and registerMapPartitionTask shuffle, So Callable is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #973: [CELEBORN-8] [ISSUE-952][FEATURE] support register shuffle task in map partition mode

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #973:
URL: https://github.com/apache/incubator-celeborn/pull/973#discussion_r1024010098


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -379,6 +402,33 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       shuffleId: Int,
       numMappers: Int,
       numReducers: Int): Unit = {
+    handleOfferAndReserveSlots(context, applicationId, shuffleId, numMappers, numReducers)

Review Comment:
   handleOfferAndReserveSlots -> offerAndReserveSlots



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -394,7 +444,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
             .values()
             .asScala
             .flatMap(_.getAllMasterLocationsWithMinEpoch(shuffleId.toString).asScala)
-            .filter(_.getEpoch == 0)
+            .filter(p =>
+              (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || (partitionType == PartitionType.MAP && p.getId == partitionId))

Review Comment:
   Do we have a consumption that for MapPartition there will always be 1 epoch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org