You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/09/01 17:35:37 UTC

spark git commit: [SPARK-16533][CORE] resolve deadlocking in driver when executors die

Repository: spark
Updated Branches:
  refs/heads/master adaaffa34 -> a0aac4b77


[SPARK-16533][CORE] resolve deadlocking in driver when executors die

## What changes were proposed in this pull request?
This pull request reverts the changes made as a part of #14605, which simply side-steps the deadlock issue. Instead, I propose the following approach:
* Use `scheduleWithFixedDelay` when calling `ExecutorAllocationManager.schedule` for scheduling executor requests. The intent of this is that if invocations are delayed beyond the default schedule interval on account of lock contention, then we avoid a situation where calls to `schedule` are made back-to-back, potentially releasing and then immediately reacquiring these locks - further exacerbating contention.
* Replace a number of calls to `askWithRetry` with `ask` inside of message handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us queue messages with the relevant endpoints, release whatever locks we might be holding, and then block whilst awaiting the response. This change is made at the cost of being able to retry should sending the message fail, as retrying outside of the lock could easily cause race conditions if other conflicting messages have been sent whilst awaiting a response. I believe this to be the lesser of two evils, as in many cases these RPC calls are to process local components, and so failures are more likely to be deterministic, and timeouts are more likely to be caused by lock contention.

## How was this patch tested?
Existing tests, and manual tests under yarn-client mode.

Author: Angus Gerry <an...@gmail.com>

Closes #14710 from angolon/SPARK-16533.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0aac4b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0aac4b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0aac4b7

Branch: refs/heads/master
Commit: a0aac4b775bc8c275f96ad0fbf85c9d8a3690588
Parents: adaaffa
Author: Angus Gerry <an...@gmail.com>
Authored: Thu Sep 1 10:35:31 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Sep 1 10:35:31 2016 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |   2 +-
 .../deploy/client/StandaloneAppClient.scala     |  38 +++----
 .../cluster/CoarseGrainedSchedulerBackend.scala | 105 ++++++++++++-------
 .../cluster/StandaloneSchedulerBackend.scala    |  10 +-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   9 +-
 .../spark/deploy/client/AppClientSuite.scala    |  30 ++++--
 .../MesosCoarseGrainedSchedulerBackend.scala    |   5 +-
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  13 ++-
 .../cluster/YarnSchedulerBackend.scala          |  95 ++++++++---------
 9 files changed, 169 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 932ba16..6f320c5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
         }
       }
     }
-    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
 
     client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index a9df732..7a60f08 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -21,6 +21,8 @@ import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
@@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
     private val registrationRetryThread =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
 
-    // A thread pool to perform receive then reply actions in a thread so as not to block the
-    // event loop.
-    private val askAndReplyThreadPool =
-      ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
-
     override def onStart(): Unit = {
       try {
         registerWithMaster(1)
@@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
         endpointRef: RpcEndpointRef,
         context: RpcCallContext,
         msg: T): Unit = {
-      // Create a thread to ask a message and reply with the result.  Allow thread to be
+      // Ask a message and create a thread to reply with the result.  Allow thread to be
       // interrupted during shutdown, otherwise context must be notified of NonFatal errors.
-      askAndReplyThreadPool.execute(new Runnable {
-        override def run(): Unit = {
-          try {
-            context.reply(endpointRef.askWithRetry[Boolean](msg))
-          } catch {
-            case ie: InterruptedException => // Cancelled
-            case NonFatal(t) =>
-              context.sendFailure(t)
-          }
-        }
-      })
+      endpointRef.ask[Boolean](msg).andThen {
+        case Success(b) => context.reply(b)
+        case Failure(ie: InterruptedException) => // Cancelled
+        case Failure(NonFatal(t)) => context.sendFailure(t)
+      }(ThreadUtils.sameThread)
     }
 
     override def onDisconnected(address: RpcAddress): Unit = {
@@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
       registrationRetryThread.shutdownNow()
       registerMasterFutures.get.foreach(_.cancel(true))
       registerMasterThreadPool.shutdownNow()
-      askAndReplyThreadPool.shutdownNow()
     }
 
   }
@@ -301,12 +291,12 @@ private[spark] class StandaloneAppClient(
    *
    * @return whether the request is acknowledged.
    */
-  def requestTotalExecutors(requestedTotal: Int): Boolean = {
+  def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
     if (endpoint.get != null && appId.get != null) {
-      endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
+      endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
     } else {
       logWarning("Attempted to request executors before driver fully initialized.")
-      false
+      Future.successful(false)
     }
   }
 
@@ -314,12 +304,12 @@ private[spark] class StandaloneAppClient(
    * Kill the given list of executors through the Master.
    * @return whether the kill request is acknowledged.
    */
-  def killExecutors(executorIds: Seq[String]): Boolean = {
+  def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
     if (endpoint.get != null && appId.get != null) {
-      endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
+      endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
     } else {
       logWarning("Attempted to kill executors before driver fully initialized.")
-      false
+      Future.successful(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2db3a3b..6d26705 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
 
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
 import org.apache.spark.internal.Logging
@@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   protected val totalRegisteredExecutors = new AtomicInteger(0)
   protected val conf = scheduler.sc.conf
   private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+  private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   private val _minRegisteredRatio =
@@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     // Remove a disconnected slave from the cluster
     private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+      logDebug(s"Asked to remove executor $executorId with reason $reason")
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
@@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * Request an additional number of executors from the cluster manager.
    * @return whether the request is acknowledged.
    */
-  final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+  final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     if (numAdditionalExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of additional executor(s) " +
         s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
     }
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
-    logDebug(s"Number of pending executors is now $numPendingExecutors")
 
-    numPendingExecutors += numAdditionalExecutors
-    // Account for executors pending to be added or removed
-    val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
-    doRequestTotalExecutors(newTotal)
+    val response = synchronized {
+      numPendingExecutors += numAdditionalExecutors
+      logDebug(s"Number of pending executors is now $numPendingExecutors")
+
+      // Account for executors pending to be added or removed
+      doRequestTotalExecutors(
+        numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+    }
+
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
@@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       numExecutors: Int,
       localityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int]
-    ): Boolean = synchronized {
+    ): Boolean = {
     if (numExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of executor(s) " +
           s"$numExecutors from the cluster manager. Please specify a positive number!")
     }
 
-    this.localityAwareTasks = localityAwareTasks
-    this.hostToLocalTaskCount = hostToLocalTaskCount
+    val response = synchronized {
+      this.localityAwareTasks = localityAwareTasks
+      this.hostToLocalTaskCount = hostToLocalTaskCount
+
+      numPendingExecutors =
+        math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
 
-    numPendingExecutors =
-      math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
-    doRequestTotalExecutors(numExecutors)
+      doRequestTotalExecutors(numExecutors)
+    }
+
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
@@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * insufficient resources to satisfy the first request. We make the assumption here that the
    * cluster manager will eventually fulfill all requests when resources free up.
    *
-   * @return whether the request is acknowledged.
+   * @return a future whose evaluation indicates whether the request is acknowledged.
    */
-  protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+  protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
+    Future.successful(false)
 
   /**
    * Request that the cluster manager kill the specified executors.
    * @return whether the kill request is acknowledged. If list to kill is empty, it will return
    *         false.
    */
-  final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+  final override def killExecutors(executorIds: Seq[String]): Boolean = {
     killExecutors(executorIds, replace = false, force = false)
   }
 
@@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
       executorIds: Seq[String],
       replace: Boolean,
-      force: Boolean): Boolean = synchronized {
+      force: Boolean): Boolean = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
-    val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
-    unknownExecutors.foreach { id =>
-      logWarning(s"Executor to kill $id does not exist!")
-    }
 
-    // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
-    // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
-    val executorsToKill = knownExecutors
-      .filter { id => !executorsPendingToRemove.contains(id) }
-      .filter { id => force || !scheduler.isExecutorBusy(id) }
-    executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
-
-    // If we do not wish to replace the executors we kill, sync the target number of executors
-    // with the cluster manager to avoid allocating new ones. When computing the new target,
-    // take into account executors that are pending to be added or removed.
-    if (!replace) {
-      doRequestTotalExecutors(
-        numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
-    } else {
-      numPendingExecutors += knownExecutors.size
+    val response = synchronized {
+      val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+      unknownExecutors.foreach { id =>
+        logWarning(s"Executor to kill $id does not exist!")
+      }
+
+      // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+      // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+      val executorsToKill = knownExecutors
+        .filter { id => !executorsPendingToRemove.contains(id) }
+        .filter { id => force || !scheduler.isExecutorBusy(id) }
+      executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+
+      // If we do not wish to replace the executors we kill, sync the target number of executors
+      // with the cluster manager to avoid allocating new ones. When computing the new target,
+      // take into account executors that are pending to be added or removed.
+      val adjustTotalExecutors =
+        if (!replace) {
+          doRequestTotalExecutors(
+            numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+        } else {
+          numPendingExecutors += knownExecutors.size
+          Future.successful(true)
+        }
+
+      val killExecutors: Boolean => Future[Boolean] =
+        if (!executorsToKill.isEmpty) {
+          _ => doKillExecutors(executorsToKill)
+        } else {
+          _ => Future.successful(false)
+        }
+
+      adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
     }
 
-    !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
    * Kill the given list of executors through the cluster manager.
    * @return whether the kill request is acknowledged.
    */
-  protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
-
+  protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
+    Future.successful(false)
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 8382fbe..5068bf2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.Semaphore
 
+import scala.concurrent.Future
+
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
@@ -173,12 +175,12 @@ private[spark] class StandaloneSchedulerBackend(
    *
    * @return whether the request is acknowledged.
    */
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
     Option(client) match {
       case Some(c) => c.requestTotalExecutors(requestedTotal)
       case None =>
         logWarning("Attempted to request executors before driver fully initialized.")
-        false
+        Future.successful(false)
     }
   }
 
@@ -186,12 +188,12 @@ private[spark] class StandaloneSchedulerBackend(
    * Kill the given list of executors through the Master.
    * @return whether the kill request is acknowledged.
    */
-  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+  protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
     Option(client) match {
       case Some(c) => c.killExecutors(executorIds)
       case None =>
         logWarning("Attempted to kill executors before driver fully initialized.")
-        false
+        Future.successful(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5f59c17..915d7a1 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
 
 import scala.collection.Map
 import scala.collection.mutable
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
 import org.mockito.Matchers
@@ -269,13 +270,13 @@ private class FakeSchedulerBackend(
     clusterManagerEndpoint: RpcEndpointRef)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    clusterManagerEndpoint.askWithRetry[Boolean](
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+    clusterManagerEndpoint.ask[Boolean](
       RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
-  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+  protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index f6ef9d1..416efaa 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 import scala.concurrent.duration._
 
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
 
 import org.apache.spark._
 import org.apache.spark.deploy.{ApplicationDescription, Command}
@@ -36,7 +36,12 @@ import org.apache.spark.util.Utils
 /**
  * End-to-end tests for application client in standalone mode.
  */
-class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
+class AppClientSuite
+    extends SparkFunSuite
+    with LocalSparkContext
+    with BeforeAndAfterAll
+    with Eventually
+    with ScalaFutures {
   private val numWorkers = 2
   private val conf = new SparkConf()
   private val securityManager = new SecurityManager(conf)
@@ -93,7 +98,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
 
     // Send message to Master to request Executors, verify request by change in executor limit
     val numExecutorsRequested = 1
-    assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+    whenReady(
+        ci.client.requestTotalExecutors(numExecutorsRequested),
+        timeout(10.seconds),
+        interval(10.millis)) { acknowledged =>
+      assert(acknowledged)
+    }
 
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
@@ -101,10 +111,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
     }
 
     // Send request to kill executor, verify request was made
-    assert {
-      val apps = getApplications()
-      val executorId: String = apps.head.executors.head._2.fullId
-      ci.client.killExecutors(Seq(executorId))
+    val executorId: String = getApplications().head.executors.head._2.fullId
+    whenReady(
+        ci.client.killExecutors(Seq(executorId)),
+        timeout(10.seconds),
+        interval(10.millis)) { acknowledged =>
+      assert(acknowledged)
     }
 
     // Issue stop command for Client to disconnect from Master
@@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
     val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
 
     // requests to master should fail immediately
-    assert(ci.client.requestTotalExecutors(3) === false)
+    whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success =>
+      assert(success === false)
+    }
   }
 
   // ===============================

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index fde1fb3..a64b576 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.Future
 
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 
@@ -606,7 +607,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       super.applicationId
     }
 
-  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
     // We don't truly know if we can fulfill the full amount of executors
     // since at coarse grain it depends on the amount of slaves available.
     logInfo("Capping the total amount of executors to " + requestedTotal)
@@ -614,7 +615,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     true
   }
 
-  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
     if (mesosDriver == null) {
       logWarning("Asked to kill executors before the Mesos driver was started.")
       false

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d98ddb2..6948be0 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
 import scala.concurrent.Promise
 import scala.reflect.ClassTag
 
@@ -27,6 +28,7 @@ import org.apache.mesos.Protos._
 import org.mockito.Matchers
 import org.mockito.Matchers._
 import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
@@ -40,7 +42,8 @@ import org.apache.spark.scheduler.cluster.mesos.Utils._
 class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     with LocalSparkContext
     with MockitoSugar
-    with BeforeAndAfter {
+    with BeforeAndAfter
+    with ScalaFutures {
 
   private var sparkConf: SparkConf = _
   private var driver: SchedulerDriver = _
@@ -50,6 +53,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   private var driverEndpoint: RpcEndpointRef = _
   @volatile private var stopCalled = false
 
+  // All 'requests' to the scheduler run immediately on the same thread, so
+  // demand that all futures have their value available immediately.
+  implicit override val patienceConfig = PatienceConfig(timeout = 0.seconds)
+
   test("mesos supports killing and limiting executors") {
     setBackend()
     sparkConf.set("spark.driver.host", "driverHost")
@@ -64,8 +71,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     verifyTaskLaunched(driver, "o1")
 
     // kills executors
-    backend.doRequestTotalExecutors(0)
-    assert(backend.doKillExecutors(Seq("0")))
+    assert(backend.doRequestTotalExecutors(0).futureValue)
+    assert(backend.doKillExecutors(Seq("0")).futureValue)
     val taskID0 = createTaskId("0")
     verify(driver, times(1)).killTask(taskID0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0aac4b7/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index ea63ff5..2f9ea19 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
@@ -124,28 +125,16 @@ private[spark] abstract class YarnSchedulerBackend(
    * Request executors from the ApplicationMaster by specifying the total number desired.
    * This includes executors already pending or running.
    */
-  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
-    yarnSchedulerEndpoint.amEndpoint match {
-      case Some(am) =>
-        try {
-          am.askWithRetry[Boolean](r)
-        } catch {
-          case NonFatal(e) =>
-            logError(s"Sending $r to AM was unsuccessful", e)
-            return false
-        }
-      case None =>
-        logWarning("Attempted to request executors before the AM has registered!")
-        return false
-    }
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
   /**
    * Request that the ApplicationMaster kill the specified executors.
    */
-  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
@@ -221,37 +210,37 @@ private[spark] abstract class YarnSchedulerBackend(
    */
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
-    var amEndpoint: Option[RpcEndpointRef] = None
-
-    private val askAmThreadPool =
-      ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
-    implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+    private var amEndpoint: Option[RpcEndpointRef] = None
 
     private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
         executorId: String,
         executorRpcAddress: RpcAddress): Unit = {
-      amEndpoint match {
+      val removeExecutorMessage = amEndpoint match {
         case Some(am) =>
           val lossReasonRequest = GetExecutorLossReason(executorId)
-          val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
-          future onSuccess {
-            case reason: ExecutorLossReason =>
-              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
-          }
-          future onFailure {
-            case NonFatal(e) =>
-              logWarning(s"Attempted to get executor loss reason" +
-                s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
-                s" but got no response. Marking as slave lost.", e)
-              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
-            case t => throw t
-          }
+          am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+            .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
+            .recover {
+              case NonFatal(e) =>
+                logWarning(s"Attempted to get executor loss reason" +
+                  s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+                  s" but got no response. Marking as slave lost.", e)
+                RemoveExecutor(executorId, SlaveLost())
+            }(ThreadUtils.sameThread)
         case None =>
           logWarning("Attempted to check for an executor loss reason" +
             " before the AM has registered!")
-          driverEndpoint.askWithRetry[Boolean](
-            RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+          Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
       }
+
+      removeExecutorMessage
+        .flatMap { message =>
+          driverEndpoint.ask[Boolean](message)
+        }(ThreadUtils.sameThread)
+        .onFailure {
+          case NonFatal(e) => logError(
+            s"Error requesting driver to remove executor $executorId after disconnection.", e)
+        }(ThreadUtils.sameThread)
     }
 
     override def receive: PartialFunction[Any, Unit] = {
@@ -269,9 +258,13 @@ private[spark] abstract class YarnSchedulerBackend(
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
 
-      case RemoveExecutor(executorId, reason) =>
+      case r @ RemoveExecutor(executorId, reason) =>
         logWarning(reason.toString)
-        removeExecutor(executorId, reason)
+        driverEndpoint.ask[Boolean](r).onFailure {
+          case e =>
+            logError("Error requesting driver to remove executor" +
+              s" $executorId for reason $reason", e)
+        }(ThreadUtils.sameThread)
     }
 
 
@@ -279,13 +272,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case r: RequestExecutors =>
         amEndpoint match {
           case Some(am) =>
-            Future {
-              context.reply(am.askWithRetry[Boolean](r))
-            } onFailure {
-              case NonFatal(e) =>
+            am.ask[Boolean](r).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
                 logError(s"Sending $r to AM was unsuccessful", e)
                 context.sendFailure(e)
-            }
+            }(ThreadUtils.sameThread)
           case None =>
             logWarning("Attempted to request executors before the AM has registered!")
             context.reply(false)
@@ -294,13 +286,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case k: KillExecutors =>
         amEndpoint match {
           case Some(am) =>
-            Future {
-              context.reply(am.askWithRetry[Boolean](k))
-            } onFailure {
-              case NonFatal(e) =>
+            am.ask[Boolean](k).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
                 logError(s"Sending $k to AM was unsuccessful", e)
                 context.sendFailure(e)
-            }
+            }(ThreadUtils.sameThread)
           case None =>
             logWarning("Attempted to kill executors before the AM has registered!")
             context.reply(false)
@@ -316,10 +307,6 @@ private[spark] abstract class YarnSchedulerBackend(
         amEndpoint = None
       }
     }
-
-    override def onStop(): Unit = {
-      askAmThreadPool.shutdownNow()
-    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org