You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/09/07 23:43:13 UTC

spark git commit: [SPARK-16533][CORE] - backport driver deadlock fix to 2.0

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 078ac0e63 -> 067752ce0


[SPARK-16533][CORE] - backport driver deadlock fix to 2.0

## What changes were proposed in this pull request?
Backport changes from #14710 and #14925 to 2.0

Author: Marcelo Vanzin <va...@cloudera.com>
Author: Angus Gerry <an...@gmail.com>

Closes #14933 from angolon/SPARK-16533-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/067752ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/067752ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/067752ce

Branch: refs/heads/branch-2.0
Commit: 067752ce08dc035ee807d20be2202c385f88f01c
Parents: 078ac0e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 7 16:43:05 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Sep 7 16:43:05 2016 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |   2 +-
 .../deploy/client/StandaloneAppClient.scala     |  38 +++----
 .../cluster/CoarseGrainedSchedulerBackend.scala | 105 ++++++++++++-------
 .../cluster/StandaloneSchedulerBackend.scala    |  10 +-
 .../MesosCoarseGrainedSchedulerBackend.scala    |   5 +-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   9 +-
 .../spark/deploy/client/AppClientSuite.scala    |  30 ++++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  14 ++-
 .../cluster/YarnSchedulerBackend.scala          |  95 ++++++++---------
 9 files changed, 170 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 932ba16..6f320c5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
         }
       }
     }
-    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
 
     client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index b1c414d..93f58ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -21,6 +21,8 @@ import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
@@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
     private val registrationRetryThread =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
 
-    // A thread pool to perform receive then reply actions in a thread so as not to block the
-    // event loop.
-    private val askAndReplyThreadPool =
-      ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
-
     override def onStart(): Unit = {
       try {
         registerWithMaster(1)
@@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
         endpointRef: RpcEndpointRef,
         context: RpcCallContext,
         msg: T): Unit = {
-      // Create a thread to ask a message and reply with the result.  Allow thread to be
+      // Ask a message and create a thread to reply with the result.  Allow thread to be
       // interrupted during shutdown, otherwise context must be notified of NonFatal errors.
-      askAndReplyThreadPool.execute(new Runnable {
-        override def run(): Unit = {
-          try {
-            context.reply(endpointRef.askWithRetry[Boolean](msg))
-          } catch {
-            case ie: InterruptedException => // Cancelled
-            case NonFatal(t) =>
-              context.sendFailure(t)
-          }
-        }
-      })
+      endpointRef.ask[Boolean](msg).andThen {
+        case Success(b) => context.reply(b)
+        case Failure(ie: InterruptedException) => // Cancelled
+        case Failure(NonFatal(t)) => context.sendFailure(t)
+      }(ThreadUtils.sameThread)
     }
 
     override def onDisconnected(address: RpcAddress): Unit = {
@@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
       registrationRetryThread.shutdownNow()
       registerMasterFutures.get.foreach(_.cancel(true))
       registerMasterThreadPool.shutdownNow()
-      askAndReplyThreadPool.shutdownNow()
     }
 
   }
@@ -301,12 +291,12 @@ private[spark] class StandaloneAppClient(
    *
    * @return whether the request is acknowledged.
    */
-  def requestTotalExecutors(requestedTotal: Int): Boolean = {
+  def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
     if (endpoint.get != null && appId.get != null) {
-      endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
+      endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
     } else {
       logWarning("Attempted to request executors before driver fully initialized.")
-      false
+      Future.successful(false)
     }
   }
 
@@ -314,12 +304,12 @@ private[spark] class StandaloneAppClient(
    * Kill the given list of executors through the Master.
    * @return whether the kill request is acknowledged.
    */
-  def killExecutors(executorIds: Seq[String]): Boolean = {
+  def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
     if (endpoint.get != null && appId.get != null) {
-      endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
+      endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
     } else {
       logWarning("Attempted to kill executors before driver fully initialized.")
-      false
+      Future.successful(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5a74ddd..c6b3fdf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
 
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
 import org.apache.spark.internal.Logging
@@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   protected val totalRegisteredExecutors = new AtomicInteger(0)
   protected val conf = scheduler.sc.conf
   private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+  private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   private val _minRegisteredRatio =
@@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     // Remove a disconnected slave from the cluster
     private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+      logDebug(s"Asked to remove executor $executorId with reason $reason")
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
@@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * Request an additional number of executors from the cluster manager.
    * @return whether the request is acknowledged.
    */
-  final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+  final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     if (numAdditionalExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of additional executor(s) " +
         s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
     }
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
-    logDebug(s"Number of pending executors is now $numPendingExecutors")
 
-    numPendingExecutors += numAdditionalExecutors
-    // Account for executors pending to be added or removed
-    val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
-    doRequestTotalExecutors(newTotal)
+    val response = synchronized {
+      numPendingExecutors += numAdditionalExecutors
+      logDebug(s"Number of pending executors is now $numPendingExecutors")
+
+      // Account for executors pending to be added or removed
+      doRequestTotalExecutors(
+        numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+    }
+
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
@@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       numExecutors: Int,
       localityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int]
-    ): Boolean = synchronized {
+    ): Boolean = {
     if (numExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of executor(s) " +
           s"$numExecutors from the cluster manager. Please specify a positive number!")
     }
 
-    this.localityAwareTasks = localityAwareTasks
-    this.hostToLocalTaskCount = hostToLocalTaskCount
+    val response = synchronized {
+      this.localityAwareTasks = localityAwareTasks
+      this.hostToLocalTaskCount = hostToLocalTaskCount
+
+      numPendingExecutors =
+        math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
 
-    numPendingExecutors =
-      math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
-    doRequestTotalExecutors(numExecutors)
+      doRequestTotalExecutors(numExecutors)
+    }
+
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
@@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * insufficient resources to satisfy the first request. We make the assumption here that the
    * cluster manager will eventually fulfill all requests when resources free up.
    *
-   * @return whether the request is acknowledged.
+   * @return a future whose evaluation indicates whether the request is acknowledged.
    */
-  protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+  protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
+    Future.successful(false)
 
   /**
    * Request that the cluster manager kill the specified executors.
    * @return whether the kill request is acknowledged. If list to kill is empty, it will return
    *         false.
    */
-  final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+  final override def killExecutors(executorIds: Seq[String]): Boolean = {
     killExecutors(executorIds, replace = false, force = false)
   }
 
@@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
       executorIds: Seq[String],
       replace: Boolean,
-      force: Boolean): Boolean = synchronized {
+      force: Boolean): Boolean = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
-    val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
-    unknownExecutors.foreach { id =>
-      logWarning(s"Executor to kill $id does not exist!")
-    }
 
-    // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
-    // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
-    val executorsToKill = knownExecutors
-      .filter { id => !executorsPendingToRemove.contains(id) }
-      .filter { id => force || !scheduler.isExecutorBusy(id) }
-    executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
-
-    // If we do not wish to replace the executors we kill, sync the target number of executors
-    // with the cluster manager to avoid allocating new ones. When computing the new target,
-    // take into account executors that are pending to be added or removed.
-    if (!replace) {
-      doRequestTotalExecutors(
-        numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
-    } else {
-      numPendingExecutors += knownExecutors.size
+    val response = synchronized {
+      val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+      unknownExecutors.foreach { id =>
+        logWarning(s"Executor to kill $id does not exist!")
+      }
+
+      // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+      // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+      val executorsToKill = knownExecutors
+        .filter { id => !executorsPendingToRemove.contains(id) }
+        .filter { id => force || !scheduler.isExecutorBusy(id) }
+      executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+
+      // If we do not wish to replace the executors we kill, sync the target number of executors
+      // with the cluster manager to avoid allocating new ones. When computing the new target,
+      // take into account executors that are pending to be added or removed.
+      val adjustTotalExecutors =
+        if (!replace) {
+          doRequestTotalExecutors(
+            numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+        } else {
+          numPendingExecutors += knownExecutors.size
+          Future.successful(true)
+        }
+
+      val killExecutors: Boolean => Future[Boolean] =
+        if (!executorsToKill.isEmpty) {
+          _ => doKillExecutors(executorsToKill)
+        } else {
+          _ => Future.successful(false)
+        }
+
+      adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
     }
 
-    !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
+    defaultAskTimeout.awaitResult(response)
   }
 
   /**
    * Kill the given list of executors through the cluster manager.
    * @return whether the kill request is acknowledged.
    */
-  protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
-
+  protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
+    Future.successful(false)
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index ea7e96a..04d40e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.Semaphore
 
+import scala.concurrent.Future
+
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
@@ -174,12 +176,12 @@ private[spark] class StandaloneSchedulerBackend(
    *
    * @return whether the request is acknowledged.
    */
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
     Option(client) match {
       case Some(c) => c.requestTotalExecutors(requestedTotal)
       case None =>
         logWarning("Attempted to request executors before driver fully initialized.")
-        false
+        Future.successful(false)
     }
   }
 
@@ -187,12 +189,12 @@ private[spark] class StandaloneSchedulerBackend(
    * Kill the given list of executors through the Master.
    * @return whether the kill request is acknowledged.
    */
-  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+  protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
     Option(client) match {
       case Some(c) => c.killExecutors(executorIds)
       case None =>
         logWarning("Attempted to kill executors before driver fully initialized.")
-        false
+        Future.successful(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 2dcd67c..473b1be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{Buffer, HashMap, HashSet}
+import scala.concurrent.Future
 
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 
@@ -577,7 +578,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       super.applicationId
     }
 
-  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
     // We don't truly know if we can fulfill the full amount of executors
     // since at coarse grain it depends on the amount of slaves available.
     logInfo("Capping the total amount of executors to " + requestedTotal)
@@ -585,7 +586,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     true
   }
 
-  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
     if (mesosDriver == null) {
       logWarning("Asked to kill executors before the Mesos driver was started.")
       false

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5e2ba31..e303495 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
 
 import scala.collection.Map
 import scala.collection.mutable
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -270,13 +271,13 @@ private class FakeSchedulerBackend(
     clusterManagerEndpoint: RpcEndpointRef)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    clusterManagerEndpoint.askWithRetry[Boolean](
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+    clusterManagerEndpoint.ask[Boolean](
       RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
-  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+  protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 57e5fb5..bc58fb2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 import scala.concurrent.duration._
 
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
 
 import org.apache.spark._
 import org.apache.spark.deploy.{ApplicationDescription, Command}
@@ -36,7 +36,12 @@ import org.apache.spark.util.Utils
 /**
  * End-to-end tests for application client in standalone mode.
  */
-class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
+class AppClientSuite
+    extends SparkFunSuite
+    with LocalSparkContext
+    with BeforeAndAfterAll
+    with Eventually
+    with ScalaFutures {
   private val numWorkers = 2
   private val conf = new SparkConf()
   private val securityManager = new SecurityManager(conf)
@@ -93,7 +98,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
 
     // Send message to Master to request Executors, verify request by change in executor limit
     val numExecutorsRequested = 1
-    assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+    whenReady(
+        ci.client.requestTotalExecutors(numExecutorsRequested),
+        timeout(10.seconds),
+        interval(10.millis)) { acknowledged =>
+      assert(acknowledged)
+    }
 
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
@@ -101,10 +111,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
     }
 
     // Send request to kill executor, verify request was made
-    assert {
-      val apps = getApplications()
-      val executorId: String = apps.head.executors.head._2.fullId
-      ci.client.killExecutors(Seq(executorId))
+    val executorId: String = getApplications().head.executors.head._2.fullId
+    whenReady(
+        ci.client.killExecutors(Seq(executorId)),
+        timeout(10.seconds),
+        interval(10.millis)) { acknowledged =>
+      assert(acknowledged)
     }
 
     // Issue stop command for Client to disconnect from Master
@@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
     val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
 
     // requests to master should fail immediately
-    assert(ci.client.requestTotalExecutors(3) === false)
+    whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success =>
+      assert(success === false)
+    }
   }
 
   // ===============================

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 12c4a79..3ffbe70 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.util.Collections
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
 import scala.concurrent.Promise
 import scala.reflect.ClassTag
 
@@ -30,6 +32,7 @@ import org.apache.mesos.Protos.Value.Scalar
 import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
@@ -42,7 +45,8 @@ import org.apache.spark.scheduler.TaskSchedulerImpl
 class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     with LocalSparkContext
     with MockitoSugar
-    with BeforeAndAfter {
+    with BeforeAndAfter
+    with ScalaFutures {
 
   private var sparkConf: SparkConf = _
   private var driver: SchedulerDriver = _
@@ -52,6 +56,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   private var driverEndpoint: RpcEndpointRef = _
   @volatile private var stopCalled = false
 
+  // All 'requests' to the scheduler run immediately on the same thread, so
+  // demand that all futures have their value available immediately.
+  implicit override val patienceConfig = PatienceConfig(timeout = Duration(0, TimeUnit.SECONDS))
+
   test("mesos supports killing and limiting executors") {
     setBackend()
     sparkConf.set("spark.driver.host", "driverHost")
@@ -66,8 +74,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     verifyTaskLaunched("o1")
 
     // kills executors
-    backend.doRequestTotalExecutors(0)
-    assert(backend.doKillExecutors(Seq("0")))
+    assert(backend.doRequestTotalExecutors(0).futureValue)
+    assert(backend.doKillExecutors(Seq("0")).futureValue)
     val taskID0 = createTaskId("0")
     verify(driver, times(1)).killTask(taskID0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/067752ce/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index ea63ff5..2f9ea19 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
@@ -124,28 +125,16 @@ private[spark] abstract class YarnSchedulerBackend(
    * Request executors from the ApplicationMaster by specifying the total number desired.
    * This includes executors already pending or running.
    */
-  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
-    yarnSchedulerEndpoint.amEndpoint match {
-      case Some(am) =>
-        try {
-          am.askWithRetry[Boolean](r)
-        } catch {
-          case NonFatal(e) =>
-            logError(s"Sending $r to AM was unsuccessful", e)
-            return false
-        }
-      case None =>
-        logWarning("Attempted to request executors before the AM has registered!")
-        return false
-    }
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
   /**
    * Request that the ApplicationMaster kill the specified executors.
    */
-  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
@@ -221,37 +210,37 @@ private[spark] abstract class YarnSchedulerBackend(
    */
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
-    var amEndpoint: Option[RpcEndpointRef] = None
-
-    private val askAmThreadPool =
-      ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
-    implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+    private var amEndpoint: Option[RpcEndpointRef] = None
 
     private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
         executorId: String,
         executorRpcAddress: RpcAddress): Unit = {
-      amEndpoint match {
+      val removeExecutorMessage = amEndpoint match {
         case Some(am) =>
           val lossReasonRequest = GetExecutorLossReason(executorId)
-          val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
-          future onSuccess {
-            case reason: ExecutorLossReason =>
-              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
-          }
-          future onFailure {
-            case NonFatal(e) =>
-              logWarning(s"Attempted to get executor loss reason" +
-                s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
-                s" but got no response. Marking as slave lost.", e)
-              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
-            case t => throw t
-          }
+          am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+            .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
+            .recover {
+              case NonFatal(e) =>
+                logWarning(s"Attempted to get executor loss reason" +
+                  s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+                  s" but got no response. Marking as slave lost.", e)
+                RemoveExecutor(executorId, SlaveLost())
+            }(ThreadUtils.sameThread)
         case None =>
           logWarning("Attempted to check for an executor loss reason" +
             " before the AM has registered!")
-          driverEndpoint.askWithRetry[Boolean](
-            RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+          Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
       }
+
+      removeExecutorMessage
+        .flatMap { message =>
+          driverEndpoint.ask[Boolean](message)
+        }(ThreadUtils.sameThread)
+        .onFailure {
+          case NonFatal(e) => logError(
+            s"Error requesting driver to remove executor $executorId after disconnection.", e)
+        }(ThreadUtils.sameThread)
     }
 
     override def receive: PartialFunction[Any, Unit] = {
@@ -269,9 +258,13 @@ private[spark] abstract class YarnSchedulerBackend(
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
 
-      case RemoveExecutor(executorId, reason) =>
+      case r @ RemoveExecutor(executorId, reason) =>
         logWarning(reason.toString)
-        removeExecutor(executorId, reason)
+        driverEndpoint.ask[Boolean](r).onFailure {
+          case e =>
+            logError("Error requesting driver to remove executor" +
+              s" $executorId for reason $reason", e)
+        }(ThreadUtils.sameThread)
     }
 
 
@@ -279,13 +272,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case r: RequestExecutors =>
         amEndpoint match {
           case Some(am) =>
-            Future {
-              context.reply(am.askWithRetry[Boolean](r))
-            } onFailure {
-              case NonFatal(e) =>
+            am.ask[Boolean](r).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
                 logError(s"Sending $r to AM was unsuccessful", e)
                 context.sendFailure(e)
-            }
+            }(ThreadUtils.sameThread)
           case None =>
             logWarning("Attempted to request executors before the AM has registered!")
             context.reply(false)
@@ -294,13 +286,12 @@ private[spark] abstract class YarnSchedulerBackend(
       case k: KillExecutors =>
         amEndpoint match {
           case Some(am) =>
-            Future {
-              context.reply(am.askWithRetry[Boolean](k))
-            } onFailure {
-              case NonFatal(e) =>
+            am.ask[Boolean](k).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
                 logError(s"Sending $k to AM was unsuccessful", e)
                 context.sendFailure(e)
-            }
+            }(ThreadUtils.sameThread)
           case None =>
             logWarning("Attempted to kill executors before the AM has registered!")
             context.reply(false)
@@ -316,10 +307,6 @@ private[spark] abstract class YarnSchedulerBackend(
         amEndpoint = None
       }
     }
-
-    override def onStop(): Unit = {
-      askAmThreadPool.shutdownNow()
-    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org