You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/25 10:26:10 UTC
[incubator-celeborn] branch main updated: [CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new fe13e9e2 [CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)
fe13e9e2 is described below
commit fe13e9e26138c362f1c81844d0c838f83cc09a81
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Fri Nov 25 18:26:05 2022 +0800
[CELEBORN-59][REFACTOR] Support send destroy slots request in parallel (#1004)
---
.../main/scala/org/apache/celeborn/client/LifecycleManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index e03f9707..917f908b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1506,7 +1506,11 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
shuffleId: Int,
slotsToDestroy: WorkerResource): Unit = {
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
- slotsToDestroy.asScala.foreach { case (workerInfo, (masterLocations, slaveLocations)) =>
+ val parallelism = Math.min(Math.max(1, slotsToDestroy.size()), conf.rpcMaxParallelism)
+ ThreadUtils.parmap(
+ slotsToDestroy.asScala,
+ "DestroySlot",
+ parallelism) { case (workerInfo, (masterLocations, slaveLocations)) =>
val destroy = Destroy(
shuffleKey,
masterLocations.asScala.map(_.getUniqueId).asJava,
@@ -1514,7 +1518,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
var res = requestDestroy(workerInfo.endpoint, destroy)
if (res.status != StatusCode.SUCCESS) {
logDebug(s"Request $destroy return ${res.status} for " +
- s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+ s"${Utils.makeShuffleKey(applicationId, shuffleId)}, will retry request destroy.")
res = requestDestroy(
workerInfo.endpoint,
Destroy(shuffleKey, res.failedMasters, res.failedSlaves))