You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/13 23:44:40 UTC

[GitHub] [spark] agrawaldevesh commented on a change in pull request #28817: [WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629

agrawaldevesh commented on a change in pull request #28817:
URL: https://github.com/apache/spark/pull/28817#discussion_r439777235



##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
     System.exit(code)
   }
 
-  private def decommissionSelf(): Boolean = {
-    logInfo("Decommissioning self w/sync")
-    try {
-      decommissioned = true
-      // Tell master we are are decommissioned so it stops trying to schedule us
-      if (driver.nonEmpty) {
-        driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+  private var previousAllBlocksMigrated = false
+  private def shutdownIfDone(): Unit = {
+    val numRunningTasks = executor.numRunningTasks
+    logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.")
+    if (executor.numRunningTasks == 0) {
+      if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+        val allBlocksMigrated = env.blockManager.decommissionManager match {
+          case Some(m) => m.allBlocksMigrated
+          case None => false // We haven't started migrations yet.
+        }
+        if (allBlocksMigrated && previousAllBlocksMigrated) {
+          logInfo("No running tasks, all blocks migrated, stopping.")
+          exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+        }
+        previousAllBlocksMigrated = allBlocksMigrated
       } else {
-        logError("No driver to message decommissioning.")
+        logInfo("No running tasks, no block migration configured, stopping.")
+        exitExecutor(0, "Finished decommissioning", notifyDriver = true)
       }
-      if (executor != null) {
-        executor.decommission()
+    } else {
+      // If there's a running task it could store blocks.

Review comment:
       I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:
   
   ```
   val allBlocksMigrated = !env.conf.get(STORAGE_DECOMMISSION_ENABLED) ||
         env.blockManager.decommissionManager.map(_.allBlocksMigrated).orElse(false)
   val exitCondition = allBlocksMigrated && numRunningTasks == 0
   if (exitCondition) { exitExecutor(...) }
   ```
   
   Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ?
   
   Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ? 

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
     System.exit(code)
   }
 
-  private def decommissionSelf(): Boolean = {
-    logInfo("Decommissioning self w/sync")
-    try {
-      decommissioned = true
-      // Tell master we are are decommissioned so it stops trying to schedule us
-      if (driver.nonEmpty) {
-        driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+  private var previousAllBlocksMigrated = false
+  private def shutdownIfDone(): Unit = {
+    val numRunningTasks = executor.numRunningTasks
+    logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.")
+    if (executor.numRunningTasks == 0) {
+      if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+        val allBlocksMigrated = env.blockManager.decommissionManager match {
+          case Some(m) => m.allBlocksMigrated
+          case None => false // We haven't started migrations yet.
+        }
+        if (allBlocksMigrated && previousAllBlocksMigrated) {
+          logInfo("No running tasks, all blocks migrated, stopping.")
+          exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+        }
+        previousAllBlocksMigrated = allBlocksMigrated
       } else {
-        logError("No driver to message decommissioning.")
+        logInfo("No running tasks, no block migration configured, stopping.")
+        exitExecutor(0, "Finished decommissioning", notifyDriver = true)
       }
-      if (executor != null) {
-        executor.decommission()
+    } else {
+      // If there's a running task it could store blocks.
+      previousAllBlocksMigrated = false
+    }
+  }
+
+  private def decommissionSelf(): Boolean = {
+    if (!decommissioned) {
+      logInfo("Decommissioning self w/sync")

Review comment:
       Perhaps we should expand what 'w/sync' stands for in the log message ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -2039,8 +2047,11 @@ private[spark] class BlockManager(
    * Class to handle block manager decommissioning retries
    * It creates a Thread to retry offloading all RDD cache blocks
    */
-  private class BlockManagerDecommissionManager(conf: SparkConf) {
+  private[spark] class BlockManagerDecommissionManager(conf: SparkConf) {
     @volatile private var stopped = false
+    // Since running tasks can add more blocks this can change.

Review comment:
       Just to make sure I am totally understanding this: You mean that the running tasks that were already running when the decommissioning was started at the executor ? Because, I think we refuse launching new tasks when the decommissioning has started, so the new blocks being written must be written by already running tasks. Did I get this right ?
   
   Also, just to confirm I am still following along: I don't see this case handled in the existing BlockManagerSuite: I believe we are not testing writing new blocks while the decom/offload is in progress.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages {
   case class UpdateDelegationTokens(tokens: Array[Byte])
     extends CoarseGrainedClusterMessage
 
+  case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned.

Review comment:
       IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ? 
   
   This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ? 
   
   Not a big deal but trying to reduce the number of message types introduced by this feature ;) 

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
     System.exit(code)
   }
 
-  private def decommissionSelf(): Boolean = {
-    logInfo("Decommissioning self w/sync")
-    try {
-      decommissioned = true
-      // Tell master we are are decommissioned so it stops trying to schedule us
-      if (driver.nonEmpty) {
-        driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+  private var previousAllBlocksMigrated = false

Review comment:
       Should this variable be marked volatile ?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -233,6 +233,7 @@ private[spark] class Executor(
    * Mark an executor for decommissioning and avoid launching new tasks.
    */
   private[spark] def decommission(): Unit = {
+    logInfo("Executor asked to decommission. Starting shutdown thread.")

Review comment:
       I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the `decommission` flag does in the Executor besides just logging.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1887,7 +1891,7 @@ private[spark] class BlockManager(
    * but rather shadows them.
    * Requires an Indexed based shuffle resolver.

Review comment:
       I think that the comment needs to be updated to reflect what the return Boolean indicates.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org