You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/03 16:28:49 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #31102: [SPARK-34054][CORE] BlockManagerDecommissioner code cleanup

cloud-fan commented on a change in pull request #31102:
URL: https://github.com/apache/spark/pull/31102#discussion_r586565211



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -66,86 +66,103 @@ private[storage] class BlockManagerDecommissioner(
    * the chance of migrating all shuffle blocks before the executor is forced to exit.
    */
   private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable {
-    @volatile var running = true
+    @volatile var keepRunning = true
+
+    private def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): Boolean = {
+      if (failureNum < maxReplicationFailuresForDecommission) {
+        logInfo(s"Add $shuffleBlock back to migration queue for " +
+          s"retry ($failureNum / $maxReplicationFailuresForDecommission)")
+        // The block needs to retry so we should not mark it as finished
+        shufflesToMigrate.add((shuffleBlock, failureNum))
+      } else {
+        logWarning(s"Give up migrating $shuffleBlock since it's been " +
+          s"failed for $maxReplicationFailuresForDecommission times")
+        false
+      }
+    }
+
+    private def nextShuffleBlockToMigrate(): (ShuffleBlockInfo, Int) = {
+      while (!Thread.currentThread().isInterrupted) {
+        Option(shufflesToMigrate.poll()) match {
+          case Some(head) => return head
+          // Nothing to do right now, but maybe a transfer will fail or a new block
+          // will finish being committed.
+          case None => Thread.sleep(1000)
+        }
+      }
+      throw new InterruptedException()
+    }
+
     override def run(): Unit = {
-      var migrating: Option[(ShuffleBlockInfo, Int)] = None
-      logInfo(s"Starting migration thread for ${peer}")
+      logInfo(s"Starting shuffle block migration thread for $peer")
       // Once a block fails to transfer to an executor stop trying to transfer more blocks
-      try {
-        while (running && !Thread.interrupted()) {
-          migrating = Option(shufflesToMigrate.poll())
-          migrating match {
-            case None =>
-              logDebug("Nothing to migrate")
-              // Nothing to do right now, but maybe a transfer will fail or a new block
-              // will finish being committed.
-              val SLEEP_TIME_SECS = 1
-              Thread.sleep(SLEEP_TIME_SECS * 1000L)
-            case Some((shuffleBlockInfo, retryCount)) =>
-              if (retryCount < maxReplicationFailuresForDecommission) {
-                val blocks = bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
-                if (blocks.isEmpty) {
-                  logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
+      while (keepRunning) {
+        try {
+          val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
+          val blocks = bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+          if (blocks.size < 2) {

Review comment:
       should this be `blocks.isEmpty` like before?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org