You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/06 21:45:11 UTC

[GitHub] [spark] holdenk commented on a change in pull request #29817: [SPARK-32850][CORE][K8S] Simplify the RPC message flow of decommission

holdenk commented on a change in pull request #29817:
URL: https://github.com/apache/spark/pull/29817#discussion_r500612097



##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,8 +672,14 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case WorkerDecommission(_, _) =>
+    case DecommissionWorker =>
+      decommissionSelf()
+
+    case WorkerSigPWRReceived =>
       decommissionSelf()
+      // Tell the Master that we are starting decommissioning
+      // so it stops trying to launch executor/driver on us
+      sendToMaster(WorkerDecommissioning(workerId, self))

Review comment:
       To make this code testable I'd move it down to inside of decommissionSelf and give decommissionSelf a param for if the decom originated locally.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -465,72 +464,50 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * @param executorsAndDecomInfo Identifiers of executors & decommission info.
    * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
    *                                 after these executors have been decommissioned.
+   * @param triggeredByExecutor whether the decommission is triggered at executor.
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
+      adjustTargetNumExecutors: Boolean,
+      triggeredByExecutor: Boolean): Seq[String] = withLock {
     // Do not change this code without running the K8s integration suites
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
-        // Only bother decommissioning executors which are alive.
-        if (isExecutorActive(executorId)) {
-          executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
-        } else {
-          false
-        }
+    val executorsToDecommission = executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
+      // Only bother decommissioning executors which are alive.
+      if (isExecutorActive(executorId)) {
+        scheduler.executorDecommission(executorId, decomInfo)
+        executorsPendingDecommission(executorId) = decomInfo.workerHost
+        Some(executorId)
+      } else {
+        None
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
-  // Do not change this code without running the K8s integration suites
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor $executorId.")
-          return false
-      }
+    // Mark those corresponding BlockManagers as decommissioned first before we sending
+    // decommission notification to executors. So, it's less likely to lead to the race
+    // condition where `getPeer` request from the decommissioned executor comes first
+    // before the BlockManagers are marked as decommissioned.
+    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+      logInfo(s"Asking BlockManagers on executors (${executorsToDecommission.mkString(", ")}) " +
+        s"to decommissioning.")
+      scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
     }
-    logInfo(s"Asked executor $executorId to decommission.")
 
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
-        scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-      } catch {
-        case e: Exception =>
-          logError("Unexpected error during block manager " +
-            s"decommissioning for executor $executorId: ${e.toString}", e)
-          return false
+    if (!triggeredByExecutor) {

Review comment:
       I think we might want to move this up because doing the storage decommissioning before the executor it's self is marked for decommissioning could mean that local blocks get stored in a manner where we might not know to migrate them. If you've dug through the code here and you don't believe that happens that's fine.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -163,8 +163,7 @@ class BlockManagerMasterEndpoint(
       context.reply(true)
 
     case DecommissionBlockManagers(executorIds) =>
-      val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get)
-      decommissionBlockManagers(bmIds)
+      decommissioningBlockManagerSet ++= executorIds.flatMap(blockManagerIdByExecutor.get)

Review comment:
       Maybe we can add a comment similar to the previous one of decommissionBlockManagers stating why we are doing this and it's intended effect.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -465,72 +464,50 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * @param executorsAndDecomInfo Identifiers of executors & decommission info.
    * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
    *                                 after these executors have been decommissioned.
+   * @param triggeredByExecutor whether the decommission is triggered at executor.
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
+      adjustTargetNumExecutors: Boolean,
+      triggeredByExecutor: Boolean): Seq[String] = withLock {

Review comment:
       Not 100% sure, but I do remember that the locking mechanism withLock did manage to deadlock before, it might be good to add a logging statement in here that verifies this block is called in the integration test.

##########
File path: core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
##########
@@ -122,7 +122,10 @@ class AppClientSuite
 
       // Send a decommission self to all the workers
       // Note: normally the worker would send this on their own.
-      workers.foreach(worker => worker.decommissionSelf())
+      workers.foreach { worker =>
+       worker.decommissionSelf()
+       master.self.send(WorkerDecommissioning(worker.workerId, worker.self))

Review comment:
       left a comment up above about how to improve the testing, I think that part of the test should cover the message being sent from the worker so we probably don't want to manually send that message here.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1809,7 +1809,9 @@ private[spark] class BlockManager(
     blocksToRemove.size
   }
 
-  def decommissionBlockManager(): Unit = synchronized {
+  def decommissionBlockManager(): Unit = storageEndpoint.ask(DecommissionBlockManager)

Review comment:
       Makes sense, although maybe introducing a new name instead of changing the use of a previous function name would be easier for verifying.

##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
##########
@@ -41,9 +41,8 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
       mainClass = "",
       expectedLogOnCompletion = Seq(
         "Finished waiting, stopping Spark",
-        "Received decommission executor message",
-        "Acknowledged decommissioning block manager",
-        ": Executor decommission.",
+        "Asking BlockManagers", "to decommissioning",
+        "Asking executor", "to decommissioning.",

Review comment:
       nit: "to decommissioning." and "to decommissioning." are close enough you really only need the second to cover both cases since we're looking for the string anywhere. If you did want to you could switch this to regex and do more accurate matching. up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org