You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/09/22 17:10:53 UTC

spark git commit: [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.

Repository: spark
Updated Branches:
  refs/heads/master 8a02410a9 -> 17b72d31e


[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.

## What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.

## How was this patch tested?
Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.

Author: Dhruve Ashar <da...@yahoo-inc.com>
Author: Dhruve Ashar <dh...@gmail.com>

Closes #15152 from dhruve/impr/SPARK-17365.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17b72d31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17b72d31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17b72d31

Branch: refs/heads/master
Commit: 17b72d31e0c59711eddeb525becb8085930eadcc
Parents: 8a02410
Author: Dhruve Ashar <da...@yahoo-inc.com>
Authored: Thu Sep 22 10:10:37 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Sep 22 10:10:37 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala |   9 +-
 .../spark/ExecutorAllocationManager.scala       |  86 +++++++++---
 .../scala/org/apache/spark/SparkContext.scala   |  24 ++--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  12 +-
 ...pache.spark.scheduler.ExternalClusterManager |   3 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 135 +++++++++++++++++--
 .../StandaloneDynamicAllocationSuite.scala      |   6 +-
 project/MimaExcludes.scala                      |   3 +
 .../scheduler/ExecutorAllocationManager.scala   |   2 +-
 .../streaming/scheduler/JobScheduler.scala      |   9 +-
 .../ExecutorAllocationManagerSuite.scala        |   5 +-
 11 files changed, 239 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 8baddf4..5d47f62 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {
 
   /**
    * Request that the cluster manager kill the specified executors.
-   * @return whether the request is acknowledged by the cluster manager.
+   * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
-  def killExecutors(executorIds: Seq[String]): Boolean
+  def killExecutors(executorIds: Seq[String]): Seq[String]
 
   /**
    * Request that the cluster manager kill the specified executor.
    * @return whether the request is acknowledged by the cluster manager.
    */
-  def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
+  def killExecutor(executorId: String): Boolean = {
+    val killedExecutors = killExecutors(Seq(executorId))
+    killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6f320c5..1366251 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.ControlThrowable
 
 import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
 
     updateAndSyncNumExecutorsTarget(now)
 
+    val executorIdsToBeRemoved = ArrayBuffer[String]()
     removeTimes.retain { case (executorId, expireTime) =>
       val expired = now >= expireTime
       if (expired) {
         initializing = false
-        removeExecutor(executorId)
+        executorIdsToBeRemoved += executorId
       }
       !expired
     }
+    if (executorIdsToBeRemoved.nonEmpty) {
+      removeExecutors(executorIdsToBeRemoved)
+    }
   }
 
   /**
@@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Returns the list of executors which are removed.
+   */
+  private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
+    val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+    logInfo("Request to remove executorIds: " + executors.mkString(", "))
+    val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
+
+    var newExecutorTotal = numExistingExecutors
+    executors.foreach { executorIdToBeRemoved =>
+      if (newExecutorTotal - 1 < minNumExecutors) {
+        logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
+          s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
+      } else if (canBeKilled(executorIdToBeRemoved)) {
+        executorIdsToBeRemoved += executorIdToBeRemoved
+        newExecutorTotal -= 1
+      }
+    }
+
+    if (executorIdsToBeRemoved.isEmpty) {
+      return Seq.empty[String]
+    }
+
+    // Send a request to the backend to kill this executor(s)
+    val executorsRemoved = if (testing) {
+      executorIdsToBeRemoved
+    } else {
+      client.killExecutors(executorIdsToBeRemoved)
+    }
+    // reset the newExecutorTotal to the existing number of executors
+    newExecutorTotal = numExistingExecutors
+    if (testing || executorsRemoved.nonEmpty) {
+      executorsRemoved.foreach { removedExecutorId =>
+        newExecutorTotal -= 1
+        logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
+          s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
+        executorsPendingToRemove.add(removedExecutorId)
+      }
+      executorsRemoved
+    } else {
+      logWarning(s"Unable to reach the cluster manager to kill executor/s " +
+        "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
+      Seq.empty[String]
+    }
+  }
+
+  /**
    * Request the cluster manager to remove the given executor.
-   * Return whether the request is received.
+   * Return whether the request is acknowledged.
    */
   private def removeExecutor(executorId: String): Boolean = synchronized {
+    val executorsRemoved = removeExecutors(Seq(executorId))
+    executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+  }
+
+  /**
+   * Determine if the given executor can be killed.
+   */
+  private def canBeKilled(executorId: String): Boolean = synchronized {
     // Do not kill the executor if we are not aware of it (should never happen)
     if (!executorIds.contains(executorId)) {
       logWarning(s"Attempted to remove unknown executor $executorId!")
@@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
       return false
     }
 
-    // Do not kill the executor if we have already reached the lower bound
-    val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
-    if (numExistingExecutors - 1 < minNumExecutors) {
-      logDebug(s"Not removing idle executor $executorId because there are only " +
-        s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
-      return false
-    }
-
-    // Send a request to the backend to kill this executor
-    val removeRequestAcknowledged = testing || client.killExecutor(executorId)
-    if (removeRequestAcknowledged) {
-      logInfo(s"Removing executor $executorId because it has been idle for " +
-        s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
-      executorsPendingToRemove.add(executorId)
-      true
-    } else {
-      logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
-        s"or no executor eligible to kill!")
-      false
-    }
+    true
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1981ad5..f58037e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -73,7 +73,7 @@ import org.apache.spark.util._
  * @param config a Spark Config object describing the application configuration. Any settings in
  *   this config overrides the default configs as well as system properties.
  */
-class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
+class SparkContext(config: SparkConf) extends Logging {
 
   // The call site where this SparkContext was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
@@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
-        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+        schedulerBackend match {
+          case b: ExecutorAllocationClient =>
+            Some(new ExecutorAllocationManager(
+              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+          case _ =>
+            None
+        }
       } else {
         None
       }
@@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     listenerBus.addListener(listener)
   }
 
-  private[spark] override def getExecutorIds(): Seq[String] = {
+  private[spark] def getExecutorIds(): Seq[String] = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.getExecutorIds()
@@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @return whether the request is acknowledged by the cluster manager.
    */
   @DeveloperApi
-  override def requestTotalExecutors(
+  def requestTotalExecutors(
       numExecutors: Int,
       localityAwareTasks: Int,
       hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
@@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @return whether the request is received.
    */
   @DeveloperApi
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestExecutors(numAdditionalExecutors)
@@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @return whether the request is received.
    */
   @DeveloperApi
-  override def killExecutors(executorIds: Seq[String]): Boolean = {
+  def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(executorIds, replace = false, force = true)
+        b.killExecutors(executorIds, replace = false, force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained mode")
         false
@@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @return whether the request is received.
    */
   @DeveloperApi
-  override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
+  def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
 
   /**
    * Request that the cluster manager kill the specified executor without adjusting the
@@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(Seq(executorId), replace = true, force = true)
+        b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained mode")
         false

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c6b3fdf..edc3c19 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * @return whether the kill request is acknowledged. If list to kill is empty, it will return
    *         false.
    */
-  final override def killExecutors(executorIds: Seq[String]): Boolean = {
+  final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
     killExecutors(executorIds, replace = false, force = false)
   }
 
@@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
       executorIds: Seq[String],
       replace: Boolean,
-      force: Boolean): Boolean = {
+      force: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
     val response = synchronized {
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         .filter { id => force || !scheduler.isExecutorBusy(id) }
       executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
 
+      logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
+
       // If we do not wish to replace the executors we kill, sync the target number of executors
       // with the cluster manager to avoid allocating new ones. When computing the new target,
       // take into account executors that are pending to be added or removed.
@@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           _ => Future.successful(false)
         }
 
-      adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+      val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+      killResponse.flatMap(killSuccessful =>
+        Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
+      )(ThreadUtils.sameThread)
     }
 
     defaultAskTimeout.awaitResult(response)

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
index 757c6d2..cf8565c 100644
--- a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
+++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -1,2 +1,3 @@
 org.apache.spark.scheduler.DummyExternalClusterManager
-org.apache.spark.scheduler.MockExternalClusterManager
\ No newline at end of file
+org.apache.spark.scheduler.MockExternalClusterManager
+org.apache.spark.DummyLocalExternalClusterManager

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index c130649..ec40971 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,7 +23,9 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.util.ManualClock
 
 /**
@@ -49,7 +51,7 @@ class ExecutorAllocationManagerSuite
 
   test("verify min/max executors") {
     val conf = new SparkConf()
-      .setMaster("local")
+      .setMaster("myDummyLocalExternalClusterManager")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.testing", "true")
@@ -263,6 +265,55 @@ class ExecutorAllocationManagerSuite
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
+  test("remove multiple executors") {
+    sc = createSparkContext(5, 10, 5)
+    val manager = sc.executorAllocationManager.get
+    (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+
+    // Keep removing until the limit is reached
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutors(manager, Seq("1")) === Seq("1"))
+    assert(executorsPendingToRemove(manager).size === 1)
+    assert(executorsPendingToRemove(manager).contains("1"))
+    assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
+    assert(executorsPendingToRemove(manager).size === 3)
+    assert(executorsPendingToRemove(manager).contains("2"))
+    assert(executorsPendingToRemove(manager).contains("3"))
+    assert(!removeExecutor(manager, "100")) // remove non-existent executors
+    assert(removeExecutors(manager, Seq("101", "102")) !== Seq("101", "102"))
+    assert(executorsPendingToRemove(manager).size === 3)
+    assert(removeExecutor(manager, "4"))
+    assert(removeExecutors(manager, Seq("5")) === Seq("5"))
+    assert(!removeExecutor(manager, "6")) // reached the limit of 5
+    assert(executorsPendingToRemove(manager).size === 5)
+    assert(executorsPendingToRemove(manager).contains("4"))
+    assert(executorsPendingToRemove(manager).contains("5"))
+    assert(!executorsPendingToRemove(manager).contains("6"))
+
+    // Kill executors previously requested to remove
+    onExecutorRemoved(manager, "1")
+    assert(executorsPendingToRemove(manager).size === 4)
+    assert(!executorsPendingToRemove(manager).contains("1"))
+    onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 2)
+    assert(!executorsPendingToRemove(manager).contains("2"))
+    assert(!executorsPendingToRemove(manager).contains("3"))
+    onExecutorRemoved(manager, "2") // duplicates should not count
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 2)
+    onExecutorRemoved(manager, "4")
+    onExecutorRemoved(manager, "5")
+    assert(executorsPendingToRemove(manager).isEmpty)
+
+    // Try removing again
+    // This should still fail because the number pending + running is still at the limit
+    assert(!removeExecutor(manager, "7"))
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutors(manager, Seq("8")) !== Seq("8"))
+    assert(executorsPendingToRemove(manager).isEmpty)
+  }
+
   test ("interleaving add and remove") {
     sc = createSparkContext(5, 10, 5)
     val manager = sc.executorAllocationManager.get
@@ -283,8 +334,7 @@ class ExecutorAllocationManagerSuite
 
     // Remove until limit
     assert(removeExecutor(manager, "1"))
-    assert(removeExecutor(manager, "2"))
-    assert(removeExecutor(manager, "3"))
+    assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
     assert(!removeExecutor(manager, "4")) // lower limit reached
     assert(!removeExecutor(manager, "5"))
     onExecutorRemoved(manager, "1")
@@ -296,7 +346,7 @@ class ExecutorAllocationManagerSuite
     assert(addExecutors(manager) === 2) // upper limit reached
     assert(addExecutors(manager) === 0)
     assert(!removeExecutor(manager, "4")) // still at lower limit
-    assert(!removeExecutor(manager, "5"))
+    assert((manager, Seq("5")) !== Seq("5"))
     onExecutorAdded(manager, "9")
     onExecutorAdded(manager, "10")
     onExecutorAdded(manager, "11")
@@ -305,9 +355,7 @@ class ExecutorAllocationManagerSuite
     assert(executorIds(manager).size === 10)
 
     // Remove succeeds again, now that we are no longer at the lower limit
-    assert(removeExecutor(manager, "4"))
-    assert(removeExecutor(manager, "5"))
-    assert(removeExecutor(manager, "6"))
+    assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
     assert(removeExecutor(manager, "7"))
     assert(executorIds(manager).size === 10)
     assert(addExecutors(manager) === 0)
@@ -870,8 +918,8 @@ class ExecutorAllocationManagerSuite
     assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
 
     removeExecutor(manager, "first")
-    removeExecutor(manager, "second")
-    assert(executorsPendingToRemove(manager) === Set("first", "second"))
+    removeExecutors(manager, Seq("second", "third"))
+    assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
     assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
 
 
@@ -895,7 +943,7 @@ class ExecutorAllocationManagerSuite
       maxExecutors: Int = 5,
       initialExecutors: Int = 1): SparkContext = {
     val conf = new SparkConf()
-      .setMaster("local")
+      .setMaster("myDummyLocalExternalClusterManager")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
@@ -953,6 +1001,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val _updateAndSyncNumExecutorsTarget =
     PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
   private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
+  private val _removeExecutors = PrivateMethod[Seq[String]]('removeExecutors)
   private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
   private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
   private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged)
@@ -1008,6 +1057,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _removeExecutor(id)
   }
 
+  private def removeExecutors(manager: ExecutorAllocationManager, ids: Seq[String]): Seq[String] = {
+    manager invokePrivate _removeExecutors(ids)
+  }
+
   private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
     manager invokePrivate _onExecutorAdded(id)
   }
@@ -1040,3 +1093,65 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _hostToLocalTaskCount()
   }
 }
+
+/**
+ * A cluster manager which wraps around the scheduler and backend for local mode. It is used for
+ * testing the dynamic allocation policy.
+ */
+private class DummyLocalExternalClusterManager extends ExternalClusterManager {
+
+  def canCreate(masterURL: String): Boolean = masterURL == "myDummyLocalExternalClusterManager"
+
+  override def createTaskScheduler(
+      sc: SparkContext,
+      masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true)
+
+  override def createSchedulerBackend(
+      sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    val sb = new LocalSchedulerBackend(sc.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], 1)
+    new DummyLocalSchedulerBackend(sc, sb)
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+    val sc = scheduler.asInstanceOf[TaskSchedulerImpl]
+    sc.initialize(backend)
+  }
+}
+
+/**
+ * A scheduler backend which wraps around local scheduler backend and exposes the executor
+ * allocation client interface for testing dynamic allocation.
+ */
+private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend)
+  extends SchedulerBackend with ExecutorAllocationClient {
+
+  override private[spark] def getExecutorIds(): Seq[String] = sc.getExecutorIds()
+
+  override private[spark] def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean =
+    sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
+
+  override def requestExecutors(numAdditionalExecutors: Int): Boolean =
+    sc.requestExecutors(numAdditionalExecutors)
+
+  override def killExecutors(executorIds: Seq[String]): Seq[String] = {
+    val response = sc.killExecutors(executorIds)
+    if (response) {
+      executorIds
+    } else {
+      Seq.empty[String]
+    }
+  }
+
+  override def start(): Unit = sb.start()
+
+  override def stop(): Unit = sb.stop()
+
+  override def reviveOffers(): Unit = sb.reviveOffers()
+
+  override def defaultParallelism(): Int = sb.defaultParallelism()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 8140270..e29eb85 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -438,12 +438,12 @@ class StandaloneDynamicAllocationSuite
     val executorIdToTaskCount = taskScheduler invokePrivate getMap()
     executorIdToTaskCount(executors.head) = 1
     // kill the busy executor without force; this should fail
-    assert(!killExecutor(sc, executors.head, force = false))
+    assert(killExecutor(sc, executors.head, force = false).isEmpty)
     apps = getApplications()
     assert(apps.head.executors.size === 2)
 
     // force kill busy executor
-    assert(killExecutor(sc, executors.head, force = true))
+    assert(killExecutor(sc, executors.head, force = true).nonEmpty)
     apps = getApplications()
     // kill executor successfully
     assert(apps.head.executors.size === 1)
@@ -518,7 +518,7 @@ class StandaloneDynamicAllocationSuite
   }
 
   /** Kill the given executor, specifying whether to force kill it. */
-  private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
+  private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Seq[String] = {
     syncExecutors(sc)
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f13f3ff..0a56a6b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -818,6 +818,9 @@ object MimaExcludes {
     ) ++ Seq(
       // [SPARK-17017] Add chiSquare selector based on False Positive Rate (FPR) test
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.ChiSqSelectorModel.isSorted")
+    ) ++ Seq(
+      // [SPARK-17365][Core] Remove/Kill multiple executors together to reduce RPC call time
+      ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkContext")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index fb5587e..7b29b40 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -226,7 +226,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
       conf: SparkConf,
       batchDurationMs: Long,
       clock: Clock): Option[ExecutorAllocationManager] = {
-    if (isDynamicAllocationEnabled(conf)) {
+    if (isDynamicAllocationEnabled(conf) && client != null) {
       Some(new ExecutorAllocationManager(client, receiverTracker, conf, batchDurationMs, clock))
     } else None
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 79d6254..dbc50da 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,6 +24,7 @@ import scala.util.Failure
 
 import org.apache.commons.lang3.SerializationUtils
 
+import org.apache.spark.ExecutorAllocationClient
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{PairRDDFunctions, RDD}
 import org.apache.spark.streaming._
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     listenerBus.start()
     receiverTracker = new ReceiverTracker(ssc)
     inputInfoTracker = new InputInfoTracker(ssc)
+
+    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
+      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
+      case _ => null
+    }
+
     executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
-      ssc.sparkContext,
+      executorAllocClient,
       receiverTracker,
       ssc.conf,
       ssc.graph.batchDuration.milliseconds,

http://git-wip-us.apache.org/repos/asf/spark/blob/17b72d31/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7630f4a..b49e579 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -380,8 +380,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
   }
 
   private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = {
-    conf.setMaster("local").setAppName(this.getClass.getSimpleName).set(
-      "spark.streaming.dynamicAllocation.testing", "true")  // to test dynamic allocation
+    conf.setMaster("myDummyLocalExternalClusterManager")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.dynamicAllocation.testing", "true")  // to test dynamic allocation
 
     var ssc: StreamingContext = null
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org