You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/27 08:01:27 UTC

[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #990: [ISSUE-989][FEATURE] Support batch commit hard split partition before stage end

waitinfuture commented on code in PR #990:
URL: https://github.com/apache/incubator-celeborn/pull/990#discussion_r1032874711


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -669,6 +799,12 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
     override def apply(s: Int): util.Set[Integer] = new util.HashSet[Integer]()
   }
 
+  private val commitPartitionRegisterFunc =
+    new util.function.Function[Int, util.Set[CommitPartitionRequest]]() {
+      override def apply(s: Int): util.Set[CommitPartitionRequest] =
+        new util.HashSet[CommitPartitionRequest]()

Review Comment:
   ditto



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -124,6 +124,29 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
   // shuffleId -> set of partition id
   private val inBatchPartitions = new ConcurrentHashMap[Int, util.Set[Integer]]()
 
+  case class CommitPartitionRequest(
+      applicationId: String,
+      shuffleId: Int,
+      oldPartition: PartitionLocation)
+
+  case class ShuffleCommittedInfo(
+      committedMasterIds: util.List[String],
+      committedSlaveIds: util.List[String],
+      failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
+      currentShuffleFileCount: LongAdder)
+
+  private val commitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()

Review Comment:
   I think we should use concurrentSet here



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -246,6 +283,88 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
         batchHandleChangePartitionRequestInterval,
         TimeUnit.MILLISECONDS)
     }
+
+    batchHandleCommitPartitionSchedulerThread.foreach {
+      _.scheduleAtFixedRate(
+        new Runnable {
+          override def run(): Unit = {
+            logWarning("Batch commit hard split partition")
+            commitPartitionRequests.asScala.foreach { case (shuffleId, requests) =>
+              batchHandleCommitPartitionExecutors.submit {
+                new Runnable {
+                  override def run(): Unit = {
+                    val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)

Review Comment:
   Agree to do the refactor in following pr



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -124,6 +124,29 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
   // shuffleId -> set of partition id
   private val inBatchPartitions = new ConcurrentHashMap[Int, util.Set[Integer]]()
 
+  case class CommitPartitionRequest(
+      applicationId: String,
+      shuffleId: Int,
+      oldPartition: PartitionLocation)
+
+  case class ShuffleCommittedInfo(
+      committedMasterIds: util.List[String],
+      committedSlaveIds: util.List[String],
+      failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo],
+      committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo],
+      committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
+      currentShuffleFileCount: LongAdder)
+
+  private val commitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()
+  private val inBatchCommitPartitionRequests =
+    new ConcurrentHashMap[Int, util.Set[CommitPartitionRequest]]()

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org