You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/12 01:57:19 UTC

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission

agrawaldevesh commented on a change in pull request #29722:
URL: https://github.com/apache/spark/pull/29722#discussion_r487349021



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -94,8 +94,9 @@ private[spark] trait ExecutorAllocationClient {
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
   def decommissionExecutors(
-    executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-    adjustTargetNumExecutors: Boolean): Seq[String] = {
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],

Review comment:
       I keep messing this up myself, but is this indentation style change intentional ? If 2 spaces are okay then can we remain at that ?

##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -61,13 +61,28 @@ private[deploy] object DeployMessages {
   }
 
   /**
+   * An internal message that used by Master itself, in order to handle the
+   * `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously.
+   * @param ids A collection of Worker ids, which are pending to be decommissioned.

Review comment:
       nit: which are pending to be decommissioned -> which should be decommissioned ?

##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -245,14 +245,14 @@ private[deploy] class Master(
       logError("Leadership has been revoked -- master shutting down.")
       System.exit(0)
 
-    case WorkerDecommission(id, workerRef) =>
-      logInfo("Recording worker %s decommissioning".format(id))

Review comment:
       We don't need to handle the MasterInStandby case anymore ? I guess all messages from worker to master should have this check, right ?

##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -768,11 +768,13 @@ private[deploy] class Worker(
     }
   }
 
-  private[deploy] def decommissionSelf(): Boolean = {
+  private[deploy] def decommissionSelf(fromMaster: Boolean): Boolean = {
     if (conf.get(config.DECOMMISSION_ENABLED)) {
       logDebug("Decommissioning self")
       decommissioned = true
-      sendToMaster(WorkerDecommission(workerId, self))
+      if (!fromMaster) {
+        sendToMaster(WorkerDecommissioned(workerId))

Review comment:
       I wonder if it is trite to add a comment like: "No need to notify the master if the decommission message already came from it"

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -95,8 +95,13 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
     extends CoarseGrainedClusterMessage
 
-  case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo)
-    extends CoarseGrainedClusterMessage
+  // A message that sent from executor to driver to tell driver that the executor has been
+  // used. It's used for the case where decommission is triggered at executor (e.g., K8S)
+  case class ExecutorDecommissioned(executorId: String)
+
+  // A message that sent from driver to executor to decommission that executor.
+  // It's used for Standalone's case yet, where decommission is triggered at Worker.

Review comment:
       remove 'yet'. Are you sure that this is when the decommission is triggered at the Worker ? I think it can also be used when the decommissioning is triggered at the MasterWebUI (not necessarily a SIGPWR at the Worker).

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -166,17 +166,6 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
-        if (decommissioned) {

Review comment:
       Are you sure that this case won't happen ? Or perhaps you mean that even if it does happen, it only lasts for a short duration: (until the Driver gets to know that the executor is decommissioned), that its not worth handling it ? If so, I agree with you. 
   
   But just want to confirm when can this case even happen: task launched on a decommissioned executor ?

##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -891,16 +891,13 @@ private[deploy] class Master(
     logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}")
 
     // The workers are removed async to avoid blocking the receive loop for the entire batch
-    workersToRemove.foreach(wi => {
-      logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}")
-      self.send(WorkerDecommission(wi.id, wi.endpoint))
-    })
+    self.send(DecommissionWorkers(workersToRemove.map(_.id).toSeq))
 
     // Return the count of workers actually removed
     workersToRemove.size
   }
 
-  private def decommissionWorker(worker: WorkerInfo): Unit = {
+  private def decommissionWorker(worker: WorkerInfo, sentFromWorker: Boolean): Unit = {

Review comment:
       Would triggeredByWorker be a better name than sentFromWorker ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -581,7 +581,10 @@ private[spark] class ExecutorAllocationManager(
       if (decommissionEnabled) {
         val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
           id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
-        client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
+        client.decommissionExecutors(
+          executorIdsWithoutHostLoss,
+          adjustTargetNumExecutors = false,
+          decommissionFromDriver = true)

Review comment:
       I think decommissionFromDriver = false means that decommission is triggered by a SIGPWR on the executor. 
   
   Whereas, decommissionFromDriver = true means that it might be triggered form either the Master or the Dynamic allocation manager. 
   
   If so, should we flip decommissionFromDriver and just call it decomTriggeredByExecutor ? 

##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -768,11 +768,13 @@ private[deploy] class Worker(
     }
   }
 
-  private[deploy] def decommissionSelf(): Boolean = {
+  private[deploy] def decommissionSelf(fromMaster: Boolean): Boolean = {

Review comment:
       For consistency: triggeredByMaster instead of fromMaster ?

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -264,17 +253,21 @@ private[spark] class CoarseGrainedExecutorBackend(
     System.exit(code)
   }
 
-  private def decommissionSelf(): Boolean = {
+  private def decommissionSelf(fromDriver: Boolean): Boolean = {
     val msg = "Decommissioning self w/sync"
     logInfo(msg)
     try {
       decommissioned = true
-      // Tell master we are are decommissioned so it stops trying to schedule us
-      if (driver.nonEmpty) {
-        driver.get.askSync[Boolean](DecommissionExecutor(
-          executorId, ExecutorDecommissionInfo(msg)))
-      } else {
-        logError("No driver to message decommissioning.")
+      if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {

Review comment:
       I see, so is this the now ONLY place where block manager decommissioning is triggered ? Good.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -272,10 +268,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         removeWorker(workerId, host, message)
         context.reply(true)
 
-      case DecommissionExecutor(executorId, decommissionInfo) =>
-        logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
-        context.reply(decommissionExecutor(executorId, decommissionInfo,
-          adjustTargetNumExecutors = false))
+      case ExecutorDecommissioned(executorId) =>
+        logWarning(s"Received executor $executorId decommissioned message")
+        context.reply(
+          decommissionExecutor(
+            executorId,
+            ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."),
+            adjustTargetNumExecutors = false,
+            // TODO: add a new type like `ExecutorDecommissionInfo` for the case where executor

Review comment:
       This is a good TODO comment, should we add this at the definition of the `decommissionExecutor` method instead of at the caller ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {

Review comment:
       nice ! I like the change to flatMap and withLock.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1809,7 +1811,7 @@ private[spark] class BlockManager(
     blocksToRemove.size
   }
 
-  def decommissionBlockManager(): Unit = synchronized {
+  private[spark] def decommissionSelf(): Unit = synchronized {

Review comment:
       Didn't follow the need for the name change.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {
+    val executorsToDecommission = withLock {
+      executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
+          scheduler.executorDecommission(executorId, decomInfo)
           executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
+          Some(executorId)
         } else {
-          false
+          None
         }
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
+    // Mark those corresponding BlockManagers as decommissioned first before we sending
+    // decommission notification to executors. So, it's less likely to lead to the race
+    // condition where `getPeer` request from the decommissioned executor comes first
+    // before the BlockManagers are marked as decommissioned.
+    scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
 
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor $executorId.")
-          return false
-      }
-    }
-    logInfo(s"Asked executor $executorId to decommission.")
-
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
-        scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-      } catch {
-        case e: Exception =>
-          logError("Unexpected error during block manager " +
-            s"decommissioning for executor $executorId: ${e.toString}", e)
-          return false
+    if (decommissionFromDriver) {
+      CoarseGrainedSchedulerBackend.this.synchronized {

Review comment:
       Why the lock here ? It wasn't locked before (I think).

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {
+    val executorsToDecommission = withLock {
+      executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
+          scheduler.executorDecommission(executorId, decomInfo)
           executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
+          Some(executorId)
         } else {
-          false
+          None
         }
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
+    // Mark those corresponding BlockManagers as decommissioned first before we sending
+    // decommission notification to executors. So, it's less likely to lead to the race
+    // condition where `getPeer` request from the decommissioned executor comes first
+    // before the BlockManagers are marked as decommissioned.
+    scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
 
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor $executorId.")
-          return false
-      }
-    }
-    logInfo(s"Asked executor $executorId to decommission.")
-
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
-        scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-      } catch {
-        case e: Exception =>
-          logError("Unexpected error during block manager " +
-            s"decommissioning for executor $executorId: ${e.toString}", e)
-          return false
+    if (decommissionFromDriver) {
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        executorsToDecommission.foreach { executorId =>
+          logInfo(s"Asking executor $executorId to decommissioning.")
+          executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor)

Review comment:
        executorDataMap(executorId) might be None: if an executor was removed b/w the end of withLock on L485 to L500. The original code handled that. Should we handle it to ? (We don't have to log anything, just not crash the driver).

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {
+    val executorsToDecommission = withLock {
+      executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
+          scheduler.executorDecommission(executorId, decomInfo)
           executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
+          Some(executorId)
         } else {
-          false
+          None
         }
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
+    // Mark those corresponding BlockManagers as decommissioned first before we sending
+    // decommission notification to executors. So, it's less likely to lead to the race
+    // condition where `getPeer` request from the decommissioned executor comes first
+    // before the BlockManagers are marked as decommissioned.
+    scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)

Review comment:
       Should this check be guarded by `conf.get(STORAGE_DECOMMISSION_ENABLED)` ? Also should this only be done in case of decommissionFromDriver == true ?
   

##########
File path: core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -31,7 +31,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
 class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
 
   override def beforeEach(): Unit = {
-    val conf = new SparkConf().setAppName("test").setMaster("local")

Review comment:
       Is master = "local" the default ? And that's why the .setMaster("local") is removed ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {
+    val executorsToDecommission = withLock {
+      executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
+          scheduler.executorDecommission(executorId, decomInfo)
           executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
+          Some(executorId)
         } else {
-          false
+          None
         }
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
+    // Mark those corresponding BlockManagers as decommissioned first before we sending
+    // decommission notification to executors. So, it's less likely to lead to the race
+    // condition where `getPeer` request from the decommissioned executor comes first
+    // before the BlockManagers are marked as decommissioned.
+    scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
 
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor $executorId.")
-          return false
-      }
-    }
-    logInfo(s"Asked executor $executorId to decommission.")
-
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
-        scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))

Review comment:
       One of the differences with the old design was that the 'Clearing of the block managers from the block manager master and the actual message to the decommission block manager' happened in one place (in scheduler.sc.env.blockManager.master.decommissionBlockManagers). 
   
   Whereas now these two things are split up: The scheduler.sc.env.blockManager.master.decommissionBlockManagers no longer sends the message, just updates its in memory state.
   
   Is there a problem with this difference ? Is some race opened up or closed ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org