You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2019/03/25 18:13:47 UTC
[spark] branch branch-2.4 updated: [SPARK-27094][YARN][BRANCH-2.4]
Work around RackResolver swallowing thread interrupt.
This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 704b75d [SPARK-27094][YARN][BRANCH-2.4] Work around RackResolver swallowing thread interrupt.
704b75d is described below
commit 704b75df49f73f8ee04f89e23607cdc45d7110c2
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Mon Mar 25 18:13:20 2019 +0000
[SPARK-27094][YARN][BRANCH-2.4] Work around RackResolver swallowing thread interrupt.
To avoid the case where the YARN libraries would swallow the exception and
prevent YarnAllocator from shutting down, call the offending code in a
separate thread, so that the parent thread can respond appropriately to
the shut down.
As a safeguard, also explicitly stop the executor launch thread pool when
shutting down the application, to prevent new executors from coming up
after the application started its shutdown.
Tested with unit tests + some internal tests on real cluster.
Closes #24206 from vanzin/SPARK-27094-2.4.
Authored-by: Marcelo Vanzin <va...@cloudera.com>
Signed-off-by: DB Tsai <d_...@apple.com>
---
.../spark/deploy/yarn/ApplicationMaster.scala | 154 +++++++++++----------
.../apache/spark/deploy/yarn/YarnAllocator.scala | 45 +++++-
2 files changed, 120 insertions(+), 79 deletions(-)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f94e3f..5ff826a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -521,88 +521,94 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
reporterThread.join()
}
- private def launchReporterThread(): Thread = {
- // The number of failures in a row until Reporter thread give up
+ private def allocationThreadImpl(): Unit = {
+ // The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
-
- val t = new Thread {
- override def run() {
- var failureCount = 0
- while (!finished) {
- try {
- if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
- s"Max number of executor failures ($maxNumExecutorFailures) reached")
- } else if (allocator.isAllNodeBlacklisted) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
- "Due to executor failures all available nodes are blacklisted")
- } else {
- logDebug("Sending progress")
- allocator.allocateResources()
- }
- failureCount = 0
- } catch {
- case i: InterruptedException => // do nothing
- case e: ApplicationAttemptNotFoundException =>
- failureCount += 1
- logError("Exception from Reporter thread.", e)
- finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
- e.getMessage)
- case e: Throwable =>
- failureCount += 1
- if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
- s"$failureCount time(s) from Reporter thread.")
- } else {
- logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
- }
+ var failureCount = 0
+ while (!finished) {
+ try {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ s"Max number of executor failures ($maxNumExecutorFailures) reached")
+ } else if (allocator.isAllNodeBlacklisted) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ "Due to executor failures all available nodes are blacklisted")
+ } else {
+ logDebug("Sending progress")
+ allocator.allocateResources()
+ }
+ failureCount = 0
+ } catch {
+ case i: InterruptedException => // do nothing
+ case e: ApplicationAttemptNotFoundException =>
+ failureCount += 1
+ logError("Exception from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+ e.getMessage)
+ case e: Throwable =>
+ failureCount += 1
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+ s"$failureCount time(s) from Reporter thread.")
+ } else {
+ logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
- try {
- val numPendingAllocate = allocator.getPendingAllocate.size
- var sleepStart = 0L
- var sleepInterval = 200L // ms
- allocatorLock.synchronized {
- sleepInterval =
- if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- sleepStart = System.currentTimeMillis()
- allocatorLock.wait(sleepInterval)
- }
- val sleepDuration = System.currentTimeMillis() - sleepStart
- if (sleepDuration < sleepInterval) {
- // log when sleep is interrupted
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Slept for $sleepDuration/$sleepInterval ms.")
- // if sleep was less than the minimum interval, sleep for the rest of it
- val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
- if (toSleep > 0) {
- logDebug(s"Going back to sleep for $toSleep ms")
- // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
- // by the methods that signal allocatorLock because this is just finishing the min
- // sleep interval, which should happen even if this is signalled again.
- Thread.sleep(toSleep)
- }
+ }
+ try {
+ val numPendingAllocate = allocator.getPendingAllocate.size
+ var sleepStart = 0L
+ var sleepInterval = 200L // ms
+ allocatorLock.synchronized {
+ sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
} else {
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Slept for $sleepDuration/$sleepInterval.")
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
}
- } catch {
- case e: InterruptedException =>
+ sleepStart = System.currentTimeMillis()
+ allocatorLock.wait(sleepInterval)
+ }
+ val sleepDuration = System.currentTimeMillis() - sleepStart
+ if (sleepDuration < sleepInterval) {
+ // log when sleep is interrupted
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval ms.")
+ // if sleep was less than the minimum interval, sleep for the rest of it
+ val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+ if (toSleep > 0) {
+ logDebug(s"Going back to sleep for $toSleep ms")
+ // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+ // by the methods that signal allocatorLock because this is just finishing the min
+ // sleep interval, which should happen even if this is signalled again.
+ Thread.sleep(toSleep)
}
+ } else {
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval.")
+ }
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+ }
+
+ private def launchReporterThread(): Thread = {
+ val t = new Thread {
+ override def run(): Unit = {
+ try {
+ allocationThreadImpl()
+ } finally {
+ allocator.stop()
}
}
}
- // setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 3357084..96bc1c7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -366,6 +366,13 @@ private[yarn] class YarnAllocator(
}
}
+ def stop(): Unit = {
+ // Forcefully shut down the launcher pool, in case this is being called in the middle of
+ // container allocation. This will prevent queued executors from being started - and
+ // potentially interrupt active ExecutorRunnable instaces too.
+ launcherPool.shutdownNow()
+ }
+
private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
@@ -402,12 +409,40 @@ private[yarn] class YarnAllocator(
containersToUse, remainingAfterHostMatches)
}
- // Match remaining by rack
+ // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
+ // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
+ // a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
- for (allocatedContainer <- remainingAfterHostMatches) {
- val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
- matchContainerToRequest(allocatedContainer, rack, containersToUse,
- remainingAfterRackMatches)
+ if (remainingAfterHostMatches.nonEmpty) {
+ var exception: Option[Throwable] = None
+ val thread = new Thread("spark-rack-resolver") {
+ override def run(): Unit = {
+ try {
+ for (allocatedContainer <- remainingAfterHostMatches) {
+ val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
+ matchContainerToRequest(allocatedContainer, rack, containersToUse,
+ remainingAfterRackMatches)
+ }
+ } catch {
+ case e: Throwable =>
+ exception = Some(e)
+ }
+ }
+ }
+ thread.setDaemon(true)
+ thread.start()
+
+ try {
+ thread.join()
+ } catch {
+ case e: InterruptedException =>
+ thread.interrupt()
+ throw e
+ }
+
+ if (exception.isDefined) {
+ throw exception.get
+ }
}
// Assign remaining that are neither node-local nor rack-local
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org