You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/25 10:26:10 UTC

[incubator-celeborn] branch main updated: [CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fe13e9e2 [CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)
fe13e9e2 is described below

commit fe13e9e26138c362f1c81844d0c838f83cc09a81
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Fri Nov 25 18:26:05 2022 +0800

    [CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)
---
 .../main/scala/org/apache/celeborn/client/LifecycleManager.scala  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index e03f9707..917f908b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1506,7 +1506,11 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       shuffleId: Int,
       slotsToDestroy: WorkerResource): Unit = {
     val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
-    slotsToDestroy.asScala.foreach { case (workerInfo, (masterLocations, slaveLocations)) =>
+    val parallelism = Math.min(Math.max(1, slotsToDestroy.size()), conf.rpcMaxParallelism)
+    ThreadUtils.parmap(
+      slotsToDestroy.asScala,
+      "DestroySlot",
+      parallelism) { case (workerInfo, (masterLocations, slaveLocations)) =>
       val destroy = Destroy(
         shuffleKey,
         masterLocations.asScala.map(_.getUniqueId).asJava,
@@ -1514,7 +1518,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       var res = requestDestroy(workerInfo.endpoint, destroy)
       if (res.status != StatusCode.SUCCESS) {
         logDebug(s"Request $destroy return ${res.status} for " +
-          s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+          s"${Utils.makeShuffleKey(applicationId, shuffleId)}, will retry request destroy.")
         res = requestDestroy(
           workerInfo.endpoint,
           Destroy(shuffleKey, res.failedMasters, res.failedSlaves))