You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/24 00:53:13 UTC

[GitHub] [spark] holdenk commented on a change in pull request #29211: [SPARK-31197][CORE] Shutdown executor once we are done decommissioning

holdenk commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r459800980



##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,7 +668,7 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case DecommissionSelf =>
+    case WorkerDecommission(_, _) =>

Review comment:
       I don't have strong preferences (although this doesn't have `ed` at the end so I don't think it's implying a state).

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {

Review comment:
       For sure, good call.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.

Review comment:
       sounds good

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)

Review comment:
       So exitExecutor calls System.exit, there isn't going to be a loop for us to break out of.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+              } else {
+                logInfo("All blocks not yet migrated.")
+              }
+            } else {
+              logInfo("No running tasks, no block migration configured, stopping.")
+              exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+            }
+            Thread.sleep(sleep_time)

Review comment:
       Good point

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -115,12 +124,19 @@ private[storage] class BlockManagerDecommissioner(
   // Shuffles which are either in queue for migrations or migrated
   private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
 
+  // Shuffles which have migrated. This used to know when we are "done", being done can change
+  // if a new shuffle file is created by a running task.
+  private val numMigratedShuffles = new AtomicInteger(0)
+
   // Shuffles which are queued for migration & number of retries so far.
+  // Visible in storage for testing.
   private[storage] val shufflesToMigrate =
     new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
+  @volatile private var stoppedRDD = false

Review comment:
       Nice, sure.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +357,28 @@ private[storage] class BlockManagerDecommissioner(
     }
     logInfo("Stopped storage decommissioner")
   }
+
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.
+   *  If there are any tasks running since that time the boolean may be incorrect.
+   */
+  private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+    if (stopped || (stoppedRDD && stoppedShuffle)) {
+      (System.nanoTime(), true)

Review comment:
       So if we are not doing any migrations all blocks are migrated as of right now.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")

Review comment:
       I don't want to bound the number of loops because I think there are very legitimate cases where this could take a super long time and we just want to let it go. If theres a hard limit it can probably be set by the external cluster manager which will send a non-interruptable signal.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()

Review comment:
       Sore

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", e)
         }
+        // Send decommission message to the executor (it could have originated on the executor
+        // but not necessarily.

Review comment:
       So it would not always be a duplicate. It _may_ be a duplicate message but it may not (if it was always a duplicate I'd just drop it).

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
       .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
       // Just replicate blocks as fast as we can during testing, there isn't another
       // workload we need to worry about.
-      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Just reduced the log spam in debugging this test but still allowed the test to complete quickly.

##########
File path: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we want to minimize the
+      // overhead for executors that are not in decommissioning state as overall that will be
+      // more of the executors. For example, this will not catch a block which is already in
+      // the process of being put from a remote executor before migration starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+              } else {
+                logInfo("All blocks not yet migrated.")

Review comment:
       We could, although I'd rather keep that logging inside of Decommissioner. I'm not sure we need it, personally I'd prefer to add the logging statement if we find ourselves wishing we had it during development.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}

Review comment:
       Theres something else which is somehow imported and named atLeast (I suspect from ScalaTest?) and I needed to rename it or the compiler got confused.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
   private val maxReplicationFailuresForDecommission =
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
 
+  // Used for tracking if our migrations are complete.
+  @volatile private var lastRDDMigrationTime: Long = 0
+  @volatile private var lastShuffleMigrationTime: Long = 0
+  @volatile private var rddBlocksLeft: Boolean = true
+  @volatile private var shuffleBlocksLeft: Boolean = true
+

Review comment:
       We use this to determine if the blocks are migrated and these can be changed from seperate threads so we'd have to switch to synchronized.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have been migrated.
+   *  If there are any tasks running since that time the boolean may be incorrect.
+   */
+  private[spark] def lastMigrationInfo(): (Long, Boolean) = {

Review comment:
       I'd really rather not. I think the level above us should only be concerned if the blocks are done migrating or not. If we expose that up a level the logic is going to get even more complicated cross-class boundaries.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
   case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
+
+  // Used to ask an executor to decommission it's self.

Review comment:
       In the future it could be sent from somewhere else though to (especially when we look at YARN like things).

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -91,10 +98,12 @@ private[storage] class BlockManagerDecommissioner(
                     null)// class tag, we don't need for shuffle
                   logDebug(s"Migrated sub block ${blockId}")
                 }
-                logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+                logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
               } else {
                 logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
               }
+              logInfo(s"Migrated ${shuffleBlockInfo}")

Review comment:
       Actually lets drop this message.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -88,5 +92,6 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
     } finally {
         bmDecomManager.stop()
     }
+    bmDecomManager.stop()

Review comment:
       good point.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
   private val sparkConf = new SparkConf(false)
     .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
     .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+    // Just replicate blocks as fast as we can during testing, there isn't another
+    // workload we need to worry about.
+    .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Well compared the the default interval. What about if we say "quickly"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org