You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2019/03/25 18:13:47 UTC

[spark] branch branch-2.4 updated: [SPARK-27094][YARN][BRANCH-2.4] Work around RackResolver swallowing thread interrupt.

This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 704b75d  [SPARK-27094][YARN][BRANCH-2.4] Work around RackResolver swallowing thread interrupt.
704b75d is described below

commit 704b75df49f73f8ee04f89e23607cdc45d7110c2
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Mon Mar 25 18:13:20 2019 +0000

    [SPARK-27094][YARN][BRANCH-2.4] Work around RackResolver swallowing thread interrupt.
    
    To avoid the case where the YARN libraries would swallow the exception and
    prevent YarnAllocator from shutting down, call the offending code in a
    separate thread, so that the parent thread can respond appropriately to
    the shut down.
    
    As a safeguard, also explicitly stop the executor launch thread pool when
    shutting down the application, to prevent new executors from coming up
    after the application started its shutdown.
    
    Tested with unit tests + some internal tests on real cluster.
    
    Closes #24206 from vanzin/SPARK-27094-2.4.
    
    Authored-by: Marcelo Vanzin <va...@cloudera.com>
    Signed-off-by: DB Tsai <d_...@apple.com>
---
 .../spark/deploy/yarn/ApplicationMaster.scala      | 154 +++++++++++----------
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  45 +++++-
 2 files changed, 120 insertions(+), 79 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f94e3f..5ff826a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -521,88 +521,94 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     reporterThread.join()
   }
 
-  private def launchReporterThread(): Thread = {
-    // The number of failures in a row until Reporter thread give up
+  private def allocationThreadImpl(): Unit = {
+    // The number of failures in a row until the allocation thread gives up.
     val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
-
-    val t = new Thread {
-      override def run() {
-        var failureCount = 0
-        while (!finished) {
-          try {
-            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-              finish(FinalApplicationStatus.FAILED,
-                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
-                s"Max number of executor failures ($maxNumExecutorFailures) reached")
-            } else if (allocator.isAllNodeBlacklisted) {
-              finish(FinalApplicationStatus.FAILED,
-                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
-                "Due to executor failures all available nodes are blacklisted")
-            } else {
-              logDebug("Sending progress")
-              allocator.allocateResources()
-            }
-            failureCount = 0
-          } catch {
-            case i: InterruptedException => // do nothing
-            case e: ApplicationAttemptNotFoundException =>
-              failureCount += 1
-              logError("Exception from Reporter thread.", e)
-              finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
-                e.getMessage)
-            case e: Throwable =>
-              failureCount += 1
-              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
-                finish(FinalApplicationStatus.FAILED,
-                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
-                    s"$failureCount time(s) from Reporter thread.")
-              } else {
-                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
-              }
+    var failureCount = 0
+    while (!finished) {
+      try {
+        if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+          finish(FinalApplicationStatus.FAILED,
+            ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+            s"Max number of executor failures ($maxNumExecutorFailures) reached")
+        } else if (allocator.isAllNodeBlacklisted) {
+          finish(FinalApplicationStatus.FAILED,
+            ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+            "Due to executor failures all available nodes are blacklisted")
+        } else {
+          logDebug("Sending progress")
+          allocator.allocateResources()
+        }
+        failureCount = 0
+      } catch {
+        case i: InterruptedException => // do nothing
+        case e: ApplicationAttemptNotFoundException =>
+          failureCount += 1
+          logError("Exception from Reporter thread.", e)
+          finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+            e.getMessage)
+        case e: Throwable =>
+          failureCount += 1
+          if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+            finish(FinalApplicationStatus.FAILED,
+              ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+                s"$failureCount time(s) from Reporter thread.")
+          } else {
+            logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
           }
-          try {
-            val numPendingAllocate = allocator.getPendingAllocate.size
-            var sleepStart = 0L
-            var sleepInterval = 200L // ms
-            allocatorLock.synchronized {
-              sleepInterval =
-                if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
-                  val currentAllocationInterval =
-                    math.min(heartbeatInterval, nextAllocationInterval)
-                  nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
-                  currentAllocationInterval
-                } else {
-                  nextAllocationInterval = initialAllocationInterval
-                  heartbeatInterval
-                }
-              sleepStart = System.currentTimeMillis()
-              allocatorLock.wait(sleepInterval)
-            }
-            val sleepDuration = System.currentTimeMillis() - sleepStart
-            if (sleepDuration < sleepInterval) {
-              // log when sleep is interrupted
-              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
-                  s"Slept for $sleepDuration/$sleepInterval ms.")
-              // if sleep was less than the minimum interval, sleep for the rest of it
-              val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
-              if (toSleep > 0) {
-                logDebug(s"Going back to sleep for $toSleep ms")
-                // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
-                // by the methods that signal allocatorLock because this is just finishing the min
-                // sleep interval, which should happen even if this is signalled again.
-                Thread.sleep(toSleep)
-              }
+      }
+      try {
+        val numPendingAllocate = allocator.getPendingAllocate.size
+        var sleepStart = 0L
+        var sleepInterval = 200L // ms
+        allocatorLock.synchronized {
+          sleepInterval =
+            if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+              val currentAllocationInterval =
+                math.min(heartbeatInterval, nextAllocationInterval)
+              nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+              currentAllocationInterval
             } else {
-              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
-                  s"Slept for $sleepDuration/$sleepInterval.")
+              nextAllocationInterval = initialAllocationInterval
+              heartbeatInterval
             }
-          } catch {
-            case e: InterruptedException =>
+          sleepStart = System.currentTimeMillis()
+          allocatorLock.wait(sleepInterval)
+        }
+        val sleepDuration = System.currentTimeMillis() - sleepStart
+        if (sleepDuration < sleepInterval) {
+          // log when sleep is interrupted
+          logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+              s"Slept for $sleepDuration/$sleepInterval ms.")
+          // if sleep was less than the minimum interval, sleep for the rest of it
+          val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+          if (toSleep > 0) {
+            logDebug(s"Going back to sleep for $toSleep ms")
+            // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+            // by the methods that signal allocatorLock because this is just finishing the min
+            // sleep interval, which should happen even if this is signalled again.
+            Thread.sleep(toSleep)
           }
+        } else {
+          logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+              s"Slept for $sleepDuration/$sleepInterval.")
+        }
+      } catch {
+        case e: InterruptedException =>
+      }
+    }
+  }
+
+  private def launchReporterThread(): Thread = {
+    val t = new Thread {
+      override def run(): Unit = {
+        try {
+          allocationThreadImpl()
+        } finally {
+          allocator.stop()
         }
       }
     }
-    // setting to daemon status, though this is usually not a good idea.
     t.setDaemon(true)
     t.setName("Reporter")
     t.start()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 3357084..96bc1c7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -366,6 +366,13 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  def stop(): Unit = {
+    // Forcefully shut down the launcher pool, in case this is being called in the middle of
+    // container allocation. This will prevent queued executors from being started - and
+    // potentially interrupt active ExecutorRunnable instaces too.
+    launcherPool.shutdownNow()
+  }
+
   private def hostStr(request: ContainerRequest): String = {
     Option(request.getNodes) match {
       case Some(nodes) => nodes.asScala.mkString(",")
@@ -402,12 +409,40 @@ private[yarn] class YarnAllocator(
         containersToUse, remainingAfterHostMatches)
     }
 
-    // Match remaining by rack
+    // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
+    // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
+    // a separate thread to perform the operation.
     val remainingAfterRackMatches = new ArrayBuffer[Container]
-    for (allocatedContainer <- remainingAfterHostMatches) {
-      val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
-      matchContainerToRequest(allocatedContainer, rack, containersToUse,
-        remainingAfterRackMatches)
+    if (remainingAfterHostMatches.nonEmpty) {
+      var exception: Option[Throwable] = None
+      val thread = new Thread("spark-rack-resolver") {
+        override def run(): Unit = {
+          try {
+            for (allocatedContainer <- remainingAfterHostMatches) {
+              val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
+              matchContainerToRequest(allocatedContainer, rack, containersToUse,
+                remainingAfterRackMatches)
+            }
+          } catch {
+            case e: Throwable =>
+              exception = Some(e)
+          }
+        }
+      }
+      thread.setDaemon(true)
+      thread.start()
+
+      try {
+        thread.join()
+      } catch {
+        case e: InterruptedException =>
+          thread.interrupt()
+          throw e
+      }
+
+      if (exception.isDefined) {
+        throw exception.get
+      }
     }
 
     // Assign remaining that are neither node-local nor rack-local


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org