You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/24 12:44:37 UTC

[GitHub] [incubator-celeborn] RexXiong opened a new pull request, #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

RexXiong opened a new pull request, #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003

   # [BUG]/[FEATURE] title
   
   ### What changes were proposed in this pull request?
   - support map partition mapper end
   - support get file group for both map partition and reduce partition
   
   ### Why are the changes needed?
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @related-reviewer
   
   /assign @main-reviewer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#issuecomment-1337613611

   I think we should conduct regression cases covering the changed logic, because it's critical


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1042161585


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -201,6 +234,24 @@ class PartitionLocationInfo extends Logging {
     }
   }
 
+  def removeAllRelatedPartitions(
+      shuffleKey: String,
+      partitionIdOpt: Option[Int]): Unit = this
+    .synchronized {
+      partitionIdOpt match {
+        case Some(partitionId) =>
+          if (masterPartitionLocations.containsKey(shuffleKey)) {
+            masterPartitionLocations.get(shuffleKey).remove(partitionId)

Review Comment:
   the release slots is mainly for Controller, agree to improve later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] leesf commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
leesf commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1031932528


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -152,6 +152,16 @@ public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed Map Partition type mapper
+  public abstract void mapPartitionMapperEnd(

Review Comment:
   sorry I am a little confused with the name, would you please describe more details about the method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#issuecomment-1338647711

   > I think we should conduct regression cases covering the changed logic, because it's critical
   
   Agree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1040360754


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -276,13 +328,15 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
                         stageEndShuffleSet.contains(shuffleId)) {
                         logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
                         shuffleCommittedInfo.commitPartitionRequests.clear()
-                        Map.empty[WorkerInfo, Set[PartitionLocation]]
+                        Map.empty[WorkerInfo, mutable.Set[PartitionLocation]]
                       } else {
                         val batch = new util.HashSet[CommitPartitionRequest]()
                         batch.addAll(shuffleCommittedInfo.commitPartitionRequests)
                         val currentBatch = batch.asScala.filterNot { request =>
                           shuffleCommittedInfo.handledCommitPartitionRequests
-                            .contains(request.partition)
+                            .contains(request.partition) && isPartitionInProcess(

Review Comment:
   Seperate this into different branch, and every logic for map/reduce commit files can be refactor as MapPartitionCommitHandler and ReducePartitionCommitHandler later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1041823675


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -201,6 +234,24 @@ class PartitionLocationInfo extends Logging {
     }
   }
 
+  def removeAllRelatedPartitions(
+      shuffleKey: String,
+      partitionIdOpt: Option[Int]): Unit = this
+    .synchronized {
+      partitionIdOpt match {
+        case Some(partitionId) =>
+          if (masterPartitionLocations.containsKey(shuffleKey)) {
+            masterPartitionLocations.get(shuffleKey).remove(partitionId)

Review Comment:
   lifecycleManager need release only its memory as controller wants release its slots but they use the same function, (Am I right?) If so, It's can be improved later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1036751761


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -147,11 +147,21 @@ public abstract int mergeData(
   public abstract void pushMergedData(String applicationId, int shuffleId, int mapId, int attemptId)
       throws IOException;
 
-  // Report partition locations written by the completed map task
+  // Report partition locations written by the completed map task of ReducePartition Shuffle Type
   public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed map task of MapPartition Shuffle Type
+  public abstract void mapPartitionMapperEnd(
+      String applicationId,
+      int shuffleId,
+      int mapId,

Review Comment:
   Do we need mapId? PartitionId already contains mapId



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1037156128


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -147,11 +147,21 @@ public abstract int mergeData(
   public abstract void pushMergedData(String applicationId, int shuffleId, int mapId, int attemptId)
       throws IOException;
 
-  // Report partition locations written by the completed map task
+  // Report partition locations written by the completed map task of ReducePartition Shuffle Type
   public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed map task of MapPartition Shuffle Type
+  public abstract void mapPartitionMapperEnd(
+      String applicationId,
+      int shuffleId,
+      int mapId,

Review Comment:
   > Do we need mapId? PartitionId already contains mapId
   
   Not so necessary, But to avoid lifecycle manager to understand the semantics of partitionId. I think add mapId would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1037910911


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -147,11 +147,21 @@ public abstract int mergeData(
   public abstract void pushMergedData(String applicationId, int shuffleId, int mapId, int attemptId)
       throws IOException;
 
-  // Report partition locations written by the completed map task
+  // Report partition locations written by the completed map task of ReducePartition Shuffle Type
   public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed map task of MapPartition Shuffle Type
+  public abstract void mapPartitionMapperEnd(
+      String applicationId,
+      int shuffleId,
+      int mapId,

Review Comment:
   Then maybe we'd better comment on partitionId



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1037910911


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -147,11 +147,21 @@ public abstract int mergeData(
   public abstract void pushMergedData(String applicationId, int shuffleId, int mapId, int attemptId)
       throws IOException;
 
-  // Report partition locations written by the completed map task
+  // Report partition locations written by the completed map task of ReducePartition Shuffle Type
   public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed map task of MapPartition Shuffle Type
+  public abstract void mapPartitionMapperEnd(
+      String applicationId,
+      int shuffleId,
+      int mapId,

Review Comment:
   Then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1039739552


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -276,13 +328,15 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
                         stageEndShuffleSet.contains(shuffleId)) {
                         logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
                         shuffleCommittedInfo.commitPartitionRequests.clear()
-                        Map.empty[WorkerInfo, Set[PartitionLocation]]
+                        Map.empty[WorkerInfo, mutable.Set[PartitionLocation]]
                       } else {
                         val batch = new util.HashSet[CommitPartitionRequest]()
                         batch.addAll(shuffleCommittedInfo.commitPartitionRequests)
                         val currentBatch = batch.asScala.filterNot { request =>
                           shuffleCommittedInfo.handledCommitPartitionRequests
-                            .contains(request.partition)
+                            .contains(request.partition) && isPartitionInProcess(

Review Comment:
   Is && correct here? it will always be false for reduce partition



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1086,23 +1159,112 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       inProcessStageEndShuffleSet.add(shuffleId)
     }
 
-    // ask allLocations workers holding partitions to commit files
+    // ask allocated workers holding partitions to commit files
+    val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+    val dataLost = handleCommitFiles(applicationId, shuffleId, allocatedWorkers, None)
+
+    // reply
+    if (!dataLost) {
+      logInfo(s"Succeed to handle stageEnd for $shuffleId.")
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    } else {
+      logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
+      dataLostShuffleSet.add(shuffleId)
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    }
+    inProcessStageEndShuffleSet.remove(shuffleId)
+
+    // release resources and clear worker info
+    workerSnapshots(shuffleId).asScala.foreach { case (_, partitionLocationInfo) =>
+      partitionLocationInfo.removeMasterPartitions(shuffleId.toString)
+      partitionLocationInfo.removeSlavePartitions(shuffleId.toString)
+    }
+
+    requestReleaseSlots(
+      rssHARetryClient,
+      ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
+  }
+
+  private def handleMapPartitionEnd(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      partitionId: Int): Unit = {
+    def reply(result: Boolean): Unit = {
+      val message =
+        s"to handle MapPartitionEnd for ${Utils.makeMapKey(appId, shuffleId, mapId, attemptId)}, " +
+          s"$partitionId.";
+      result match {
+        case true => // if already committed by another try
+          logDebug(s"Succeed $message")
+          context.reply(MapperEndResponse(StatusCode.SUCCESS))
+        case false =>
+          logError(s"Failed $message")
+          context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
+      }
+    }
+
+    val partitionIds =
+      inProcessMapPartitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new util.HashSet[Int]())
+    val shufflePartitionResult =
+      partitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new ConcurrentHashMap[Int, Boolean]())
+
+    partitionIds.synchronized {
+      if (partitionIds.contains(partitionId)) {
+        // client can retry this code
+        context.reply(MapperEndResponse(StatusCode.COMMIT_FILE_IN_PROCESS))

Review Comment:
   return after reply



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1086,23 +1159,112 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       inProcessStageEndShuffleSet.add(shuffleId)
     }
 
-    // ask allLocations workers holding partitions to commit files
+    // ask allocated workers holding partitions to commit files
+    val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+    val dataLost = handleCommitFiles(applicationId, shuffleId, allocatedWorkers, None)
+
+    // reply
+    if (!dataLost) {
+      logInfo(s"Succeed to handle stageEnd for $shuffleId.")
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    } else {
+      logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
+      dataLostShuffleSet.add(shuffleId)
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    }
+    inProcessStageEndShuffleSet.remove(shuffleId)
+
+    // release resources and clear worker info
+    workerSnapshots(shuffleId).asScala.foreach { case (_, partitionLocationInfo) =>
+      partitionLocationInfo.removeMasterPartitions(shuffleId.toString)
+      partitionLocationInfo.removeSlavePartitions(shuffleId.toString)
+    }
+
+    requestReleaseSlots(
+      rssHARetryClient,
+      ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
+  }
+
+  private def handleMapPartitionEnd(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      partitionId: Int): Unit = {
+    def reply(result: Boolean): Unit = {
+      val message =
+        s"to handle MapPartitionEnd for ${Utils.makeMapKey(appId, shuffleId, mapId, attemptId)}, " +
+          s"$partitionId.";
+      result match {
+        case true => // if already committed by another try
+          logDebug(s"Succeed $message")
+          context.reply(MapperEndResponse(StatusCode.SUCCESS))
+        case false =>
+          logError(s"Failed $message")
+          context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
+      }
+    }
+
+    val partitionIds =
+      inProcessMapPartitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new util.HashSet[Int]())
+    val shufflePartitionResult =
+      partitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new ConcurrentHashMap[Int, Boolean]())
+
+    partitionIds.synchronized {
+      if (partitionIds.contains(partitionId)) {
+        // client can retry this code
+        context.reply(MapperEndResponse(StatusCode.COMMIT_FILE_IN_PROCESS))
+      } else if (shufflePartitionResult.containsKey(partitionId)) {
+        reply(shufflePartitionResult.get(partitionId))

Review Comment:
   ditto



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -143,7 +148,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       currentShuffleFileCount: LongAdder,
       commitPartitionRequests: util.Set[CommitPartitionRequest],
       handledCommitPartitionRequests: util.Set[PartitionLocation],
-      inFlightCommitRequest: AtomicInteger)
+      inFlightCommitRequest: AtomicInteger,
+      inFlightPartitionCommitRequest: ConcurrentHashMap[Int, AtomicInteger])

Review Comment:
   It's a bit confusing about the difference between inFlightPartitionCommitRequest and inFlightCommitRequest



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1086,23 +1159,112 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       inProcessStageEndShuffleSet.add(shuffleId)
     }
 
-    // ask allLocations workers holding partitions to commit files
+    // ask allocated workers holding partitions to commit files
+    val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+    val dataLost = handleCommitFiles(applicationId, shuffleId, allocatedWorkers, None)
+
+    // reply
+    if (!dataLost) {
+      logInfo(s"Succeed to handle stageEnd for $shuffleId.")
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    } else {
+      logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
+      dataLostShuffleSet.add(shuffleId)
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    }
+    inProcessStageEndShuffleSet.remove(shuffleId)
+
+    // release resources and clear worker info
+    workerSnapshots(shuffleId).asScala.foreach { case (_, partitionLocationInfo) =>
+      partitionLocationInfo.removeMasterPartitions(shuffleId.toString)
+      partitionLocationInfo.removeSlavePartitions(shuffleId.toString)
+    }
+
+    requestReleaseSlots(
+      rssHARetryClient,
+      ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
+  }
+
+  private def handleMapPartitionEnd(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      partitionId: Int): Unit = {
+    def reply(result: Boolean): Unit = {
+      val message =
+        s"to handle MapPartitionEnd for ${Utils.makeMapKey(appId, shuffleId, mapId, attemptId)}, " +
+          s"$partitionId.";
+      result match {
+        case true => // if already committed by another try
+          logDebug(s"Succeed $message")
+          context.reply(MapperEndResponse(StatusCode.SUCCESS))
+        case false =>
+          logError(s"Failed $message")
+          context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
+      }
+    }
+
+    val partitionIds =

Review Comment:
   inprocessPartitionIds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1040361076


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -143,7 +148,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       currentShuffleFileCount: LongAdder,
       commitPartitionRequests: util.Set[CommitPartitionRequest],
       handledCommitPartitionRequests: util.Set[PartitionLocation],
-      inFlightCommitRequest: AtomicInteger)
+      inFlightCommitRequest: AtomicInteger,
+      inFlightPartitionCommitRequest: ConcurrentHashMap[Int, AtomicInteger])

Review Comment:
   rename these to allInFlightPartitionCommitRequest and partitionInFlightCommitRequest instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1042173632


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -43,6 +41,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNettyRpcCallContext}
 import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.FunctionConverter._

Review Comment:
   unused import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1031981259


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -152,6 +152,16 @@ public abstract void mapperEnd(
       String applicationId, int shuffleId, int mapId, int attemptId, int numMappers)
       throws IOException;
 
+  // Report partition locations written by the completed Map Partition type mapper
+  public abstract void mapPartitionMapperEnd(

Review Comment:
   Just modified the method description to make it clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1042845749


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -43,6 +41,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNettyRpcCallContext}
 import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.FunctionConverter._

Review Comment:
   It's Implicit conversion only for scala2.11 function to java function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1041114357


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -75,24 +93,39 @@ class PartitionLocationInfo extends Logging {
   }
 
   def getAllMasterLocations(shuffleKey: String): util.List[PartitionLocation] = this.synchronized {
-    if (masterPartitionLocations.containsKey(shuffleKey)) {
-      masterPartitionLocations.get(shuffleKey)
-        .values()
-        .asScala
-        .flatMap(_.asScala)
-        .toList
-        .asJava
-    } else {
-      new util.ArrayList[PartitionLocation]()
-    }
+    getMasterLocations(shuffleKey)
   }
 
   def getAllSlaveLocations(shuffleKey: String): util.List[PartitionLocation] = this.synchronized {
-    if (slavePartitionLocations.containsKey(shuffleKey)) {
-      slavePartitionLocations.get(shuffleKey)
+    getSlaveLocations(shuffleKey)
+  }
+
+  def getMasterLocations(
+      shuffleKey: String,
+      partitionIdOpt: Option[Int] = None): util.List[PartitionLocation] = {
+    getLocations(shuffleKey, masterPartitionLocations, partitionIdOpt)
+  }
+
+  def getSlaveLocations(
+      shuffleKey: String,
+      partitionIdOpt: Option[Int] = None): util.List[PartitionLocation] = {
+    getLocations(shuffleKey, slavePartitionLocations, partitionIdOpt)
+  }
+
+  private def getLocations(
+      shuffleKey: String,
+      partitionInfo: PartitionInfo,
+      partitionIdOpt: Option[Int] = None): util.List[PartitionLocation] = this.synchronized {
+    if (partitionInfo.containsKey(shuffleKey)) {
+      partitionInfo.get(shuffleKey)
         .values()
         .asScala
         .flatMap(_.asScala)
+        .filter(p =>

Review Comment:
   It's inefficient to check for each element. Better to put the if in the beginning.



##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -201,6 +234,24 @@ class PartitionLocationInfo extends Logging {
     }
   }
 
+  def removeAllRelatedPartitions(
+      shuffleKey: String,
+      partitionIdOpt: Option[Int]): Unit = this
+    .synchronized {
+      partitionIdOpt match {
+        case Some(partitionId) =>
+          if (masterPartitionLocations.containsKey(shuffleKey)) {
+            masterPartitionLocations.get(shuffleKey).remove(partitionId)

Review Comment:
   Seems the logic to remove mappartition is inconsistent with reducepartition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture merged pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
waitinfuture merged PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1003: [CELEBORN-56] [ISSUE-945] handle map partition mapper end

Posted by GitBox <gi...@apache.org>.
RexXiong commented on code in PR #1003:
URL: https://github.com/apache/incubator-celeborn/pull/1003#discussion_r1040359606


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1086,23 +1159,112 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       inProcessStageEndShuffleSet.add(shuffleId)
     }
 
-    // ask allLocations workers holding partitions to commit files
+    // ask allocated workers holding partitions to commit files
+    val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+    val dataLost = handleCommitFiles(applicationId, shuffleId, allocatedWorkers, None)
+
+    // reply
+    if (!dataLost) {
+      logInfo(s"Succeed to handle stageEnd for $shuffleId.")
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    } else {
+      logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
+      dataLostShuffleSet.add(shuffleId)
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    }
+    inProcessStageEndShuffleSet.remove(shuffleId)
+
+    // release resources and clear worker info
+    workerSnapshots(shuffleId).asScala.foreach { case (_, partitionLocationInfo) =>
+      partitionLocationInfo.removeMasterPartitions(shuffleId.toString)
+      partitionLocationInfo.removeSlavePartitions(shuffleId.toString)
+    }
+
+    requestReleaseSlots(
+      rssHARetryClient,
+      ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
+  }
+
+  private def handleMapPartitionEnd(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      partitionId: Int): Unit = {
+    def reply(result: Boolean): Unit = {
+      val message =
+        s"to handle MapPartitionEnd for ${Utils.makeMapKey(appId, shuffleId, mapId, attemptId)}, " +
+          s"$partitionId.";
+      result match {
+        case true => // if already committed by another try
+          logDebug(s"Succeed $message")
+          context.reply(MapperEndResponse(StatusCode.SUCCESS))
+        case false =>
+          logError(s"Failed $message")
+          context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
+      }
+    }
+
+    val partitionIds =
+      inProcessMapPartitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new util.HashSet[Int]())
+    val shufflePartitionResult =
+      partitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new ConcurrentHashMap[Int, Boolean]())
+
+    partitionIds.synchronized {
+      if (partitionIds.contains(partitionId)) {
+        // client can retry this code
+        context.reply(MapperEndResponse(StatusCode.COMMIT_FILE_IN_PROCESS))
+      } else if (shufflePartitionResult.containsKey(partitionId)) {
+        reply(shufflePartitionResult.get(partitionId))

Review Comment:
   Thanks



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1086,23 +1159,112 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       inProcessStageEndShuffleSet.add(shuffleId)
     }
 
-    // ask allLocations workers holding partitions to commit files
+    // ask allocated workers holding partitions to commit files
+    val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+    val dataLost = handleCommitFiles(applicationId, shuffleId, allocatedWorkers, None)
+
+    // reply
+    if (!dataLost) {
+      logInfo(s"Succeed to handle stageEnd for $shuffleId.")
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    } else {
+      logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
+      dataLostShuffleSet.add(shuffleId)
+      // record in stageEndShuffleSet
+      stageEndShuffleSet.add(shuffleId)
+    }
+    inProcessStageEndShuffleSet.remove(shuffleId)
+
+    // release resources and clear worker info
+    workerSnapshots(shuffleId).asScala.foreach { case (_, partitionLocationInfo) =>
+      partitionLocationInfo.removeMasterPartitions(shuffleId.toString)
+      partitionLocationInfo.removeSlavePartitions(shuffleId.toString)
+    }
+
+    requestReleaseSlots(
+      rssHARetryClient,
+      ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
+  }
+
+  private def handleMapPartitionEnd(
+      context: RpcCallContext,
+      applicationId: String,
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      partitionId: Int): Unit = {
+    def reply(result: Boolean): Unit = {
+      val message =
+        s"to handle MapPartitionEnd for ${Utils.makeMapKey(appId, shuffleId, mapId, attemptId)}, " +
+          s"$partitionId.";
+      result match {
+        case true => // if already committed by another try
+          logDebug(s"Succeed $message")
+          context.reply(MapperEndResponse(StatusCode.SUCCESS))
+        case false =>
+          logError(s"Failed $message")
+          context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
+      }
+    }
+
+    val partitionIds =
+      inProcessMapPartitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new util.HashSet[Int]())
+    val shufflePartitionResult =
+      partitionEndMap.computeIfAbsent(shuffleId, (k: Int) => new ConcurrentHashMap[Int, Boolean]())
+
+    partitionIds.synchronized {
+      if (partitionIds.contains(partitionId)) {
+        // client can retry this code
+        context.reply(MapperEndResponse(StatusCode.COMMIT_FILE_IN_PROCESS))

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org