You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/24 00:17:30 UTC

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29211: [SPARK-31197][CORE] Shutdown executor once we are done decommissioning

agrawaldevesh commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r459787138



##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,7 +668,7 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case DecommissionSelf =>
+    case WorkerDecommission(_, _) =>

Review comment:
       nit: Should this be DecommissionWorker ? That sounds more like a command to me. 
   
   Whereas `WorkerDecommissioned` sounds like a state. 

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend(
    */
   private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
 
+  // Track our decommissioning status internally.

Review comment:
       nit: I think the comment is superfluous: Boolean volatile var flag => Got to be used for tracking something. And yeah it is internal.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+              } else {
+                logInfo("All blocks not yet migrated.")
+              }
+            } else {
+              logInfo("No running tasks, no block migration configured, stopping.")
+              exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+            }
+            Thread.sleep(sleep_time)

Review comment:
       nit: Can this Thread.sleep be unified with the one below ? 

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.

Review comment:
       "for if all" -> "denoting if all the" ?

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()

Review comment:
       nit: How about unpacking the tuple into two named values to make it clear ? The _1 and _2 aren't very readable.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+              } else {
+                logInfo("All blocks not yet migrated.")

Review comment:
       Should we print the number of leftover blocks to get a sense of the burndown ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
   case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
+
+  // Used to ask an executor to decommission it's self.

Review comment:
       it's self -> itself.
   
   Can you perhaps also add that the executor sends this message to itself, or is that already clear ? The "Used to ask" implies to me that someone else asks the executor.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
   private val sparkConf = new SparkConf(false)
     .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
     .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+    // Just replicate blocks as fast as we can during testing, there isn't another
+    // workload we need to worry about.
+    .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Should this also be 10 seconds ? 
   
   "as fast as we can" does not usually jibe well with "interval of 10 seconds" :-P. 

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&

Review comment:
       nit: i think this line needn't be split.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {

Review comment:
       I think this should be a daemon thread ? Also can you name the thread so that it shows up in the jstack ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +357,28 @@ private[storage] class BlockManagerDecommissioner(
     }
     logInfo("Stopped storage decommissioner")
   }
+
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.
+   *  If there are any tasks running since that time the boolean may be incorrect.
+   */
+  private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+    if (stopped || (stoppedRDD && stoppedShuffle)) {
+      (System.nanoTime(), true)

Review comment:
       I didn't follow why is System.nanoTime returned here

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")

Review comment:
       Should we log number of running tasks ? Also, should we bound the number of loops here and then print what the current iteration is ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", e)
         }
+        // Send decommission message to the executor (it could have originated on the executor
+        // but not necessarily.

Review comment:
       unterminated parenthesis.
   
   To bring your point out more clearly, should we say: Send a decommission message again to the executor ? (the use of word 'again')

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)

Review comment:
       Will exitExecutor "break" the loop or should we put in an explicit "break" right after it to exit the thread ? Both here and below.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -88,5 +92,6 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
     } finally {
         bmDecomManager.stop()
     }
+    bmDecomManager.stop()

Review comment:
       Why do we need this ? The finally block above should run right ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -91,10 +98,12 @@ private[storage] class BlockManagerDecommissioner(
                     null)// class tag, we don't need for shuffle
                   logDebug(s"Migrated sub block ${blockId}")
                 }
-                logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+                logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
               } else {
                 logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
               }
+              logInfo(s"Migrated ${shuffleBlockInfo}")

Review comment:
       Is it intentional that 'peer' was left out in this log message when moved from line 94 above ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.
+   *  If there are any tasks running since that time the boolean may be incorrect.
+   */
+  private[spark] def lastMigrationInfo(): (Long, Boolean) = {

Review comment:
       What do you think of directly exposing the "BlocksMigrated" and "BlocksToMigrate" instead of a single boolean ? I think it will provide more debugging value than a single flag. 
   
   ie, instead of returning "BlocksMigrated >= BlocksToMigrate", return the two counters directly.
   
   Or maybe this may be a lot more work because you want to track this for both shuffle as well as persisted blocks.
   
   

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -115,12 +124,19 @@ private[storage] class BlockManagerDecommissioner(
   // Shuffles which are either in queue for migrations or migrated
   private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
 
+  // Shuffles which have migrated. This used to know when we are "done", being done can change
+  // if a new shuffle file is created by a running task.
+  private val numMigratedShuffles = new AtomicInteger(0)
+
   // Shuffles which are queued for migration & number of retries so far.
+  // Visible in storage for testing.
   private[storage] val shufflesToMigrate =
     new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
+  @volatile private var stoppedRDD = false

Review comment:
       Should we initialize this to `!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)` ? 
   
   This may obviate the change to `migrateBlock` below.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}

Review comment:
       Why is atLeast renamed as least ? 

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
   private val maxReplicationFailuresForDecommission =
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
 
+  // Used for tracking if our migrations are complete.
+  @volatile private var lastRDDMigrationTime: Long = 0
+  @volatile private var lastShuffleMigrationTime: Long = 0
+  @volatile private var rddBlocksLeft: Boolean = true
+  @volatile private var shuffleBlocksLeft: Boolean = true
+

Review comment:
       Should we fold all these into a separate class with mutable fields such that we can simply return a clone of it when `lastMigrationInfo` ?
   
   I am not sure if it is worth the trouble, since it is only for info logging: In case we notice an executor that is not going away. 

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
       .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
       // Just replicate blocks as fast as we can during testing, there isn't another
       // workload we need to worry about.
-      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Why was this changed to 10 seconds ? If it is related to this PR (as opposed to just reducing flakyness), then please document that.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -133,22 +149,24 @@ private[storage] class BlockManagerDecommissioner(
 
     override def run(): Unit = {
       assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
-      while (!stopped && !Thread.interrupted()) {
+      while (!stopped && !stoppedRDD && !Thread.interrupted()) {
         logInfo("Iterating on migrating from the block manager.")
         try {
+          val startTime = System.nanoTime()
           logDebug("Attempting to replicate all cached RDD blocks")
-          decommissionRddCacheBlocks()
+          rddBlocksLeft = decommissionRddCacheBlocks()
+          lastRDDMigrationTime = startTime
           logInfo("Attempt to replicate all cached blocks done")
           logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.")
           Thread.sleep(sleepInterval)
         } catch {
           case e: InterruptedException =>
-            logInfo("Interrupted during migration, will not refresh migrations.")
-            stopped = true
+            logInfo("Interrupted during RDD migration, stopping")

Review comment:
       Thanks for changing this to 'stopping'. "will not refresh migrations" was not clear. :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org