You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/17 04:39:58 UTC

spark git commit: [SPARK-8119] HeartbeatReceiver should replace executors, not kill

Repository: spark
Updated Branches:
  refs/heads/master d86bbb4e2 -> 96aa3340f


[SPARK-8119] HeartbeatReceiver should replace executors, not kill

**Symptom.** If an executor in an application times out, `HeartbeatReceiver` attempts to kill it. After this happens, however, the application never gets an executor back even when there are cluster resources available.

**Cause.** The issue is that `sc.killExecutor` automatically assumes that the application wishes to adjust its resource requirements permanently downwards. This is not the intention in `HeartbeatReceiver`, however, which simply wants a replacement for the expired executor.

**Fix.** Differentiate between the intention to kill and the intention to replace an executor with a fresh one. More details can be found in the commit message.

Author: Andrew Or <an...@databricks.com>

Closes #7107 from andrewor14/heartbeat-no-kill and squashes the following commits:

1cd2cd7 [Andrew Or] Add regression test for SPARK-8119
25a347d [Andrew Or] Reuse more code in scheduler backend
31ebd40 [Andrew Or] Differentiate between kill and replace


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96aa3340
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96aa3340
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96aa3340

Branch: refs/heads/master
Commit: 96aa3340f41d8de4560caec97e8f3de23252c792
Parents: d86bbb4
Author: Andrew Or <an...@databricks.com>
Authored: Thu Jul 16 19:39:54 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Jul 16 19:39:54 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    |   4 +-
 .../scala/org/apache/spark/SparkContext.scala   |  40 ++++-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  40 +++--
 .../apache/spark/HeartbeatReceiverSuite.scala   | 147 ++++++++++++++++---
 4 files changed, 194 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 221b1da..43dd4a1 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
           // Asynchronously kill the executor to avoid blocking the current thread
           killExecutorThread.submit(new Runnable {
             override def run(): Unit = Utils.tryLogNonFatalError {
-              sc.killExecutor(executorId)
+              // Note: we want to get an executor back after expiring this one,
+              // so do not simply call `sc.killExecutor` here (SPARK-8119)
+              sc.killAndReplaceExecutor(executorId)
             }
           })
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bd1cc33..d00c012 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   /**
    * :: DeveloperApi ::
    * Request that the cluster manager kill the specified executors.
+   *
+   * Note: This is an indication to the cluster manager that the application wishes to adjust
+   * its resource usage downwards. If the application wishes to replace the executors it kills
+   * through this method with new ones, it should follow up explicitly with a call to
+   * {{SparkContext#requestExecutors}}.
+   *
    * This is currently only supported in YARN mode. Return whether the request is received.
    */
   @DeveloperApi
@@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   /**
    * :: DeveloperApi ::
-   * Request that cluster manager the kill the specified executor.
-   * This is currently only supported in Yarn mode. Return whether the request is received.
+   * Request that the cluster manager kill the specified executor.
+   *
+   * Note: This is an indication to the cluster manager that the application wishes to adjust
+   * its resource usage downwards. If the application wishes to replace the executor it kills
+   * through this method with a new one, it should follow up explicitly with a call to
+   * {{SparkContext#requestExecutors}}.
+   *
+   * This is currently only supported in YARN mode. Return whether the request is received.
    */
   @DeveloperApi
   override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
 
+  /**
+   * Request that the cluster manager kill the specified executor without adjusting the
+   * application resource requirements.
+   *
+   * The effect is that a new executor will be launched in place of the one killed by
+   * this request. This assumes the cluster manager will automatically and eventually
+   * fulfill all missing application resource requests.
+   *
+   * Note: The replace is by no means guaranteed; another application on the same cluster
+   * can steal the window of opportunity and acquire this application's resources in the
+   * mean time.
+   *
+   * This is currently only supported in YARN mode. Return whether the request is received.
+   */
+  private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutors(Seq(executorId), replace = true)
+      case _ =>
+        logWarning("Killing executors is only supported in coarse-grained mode")
+        false
+    }
+  }
+
   /** The version of Spark on which this application is running. */
   def version: String = SPARK_VERSION
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7f70d..0e3215d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -371,26 +371,36 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   /**
    * Request that the cluster manager kill the specified executors.
-   * Return whether the kill request is acknowledged.
+   * @return whether the kill request is acknowledged.
    */
   final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+    killExecutors(executorIds, replace = false)
+  }
+
+  /**
+   * Request that the cluster manager kill the specified executors.
+   *
+   * @param executorIds identifiers of executors to kill
+   * @param replace whether to replace the killed executors with new ones
+   * @return whether the kill request is acknowledged.
+   */
+  final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
-    val filteredExecutorIds = new ArrayBuffer[String]
-    executorIds.foreach { id =>
-      if (executorDataMap.contains(id)) {
-        filteredExecutorIds += id
-      } else {
-        logWarning(s"Executor to kill $id does not exist!")
-      }
+    val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+    unknownExecutors.foreach { id =>
+      logWarning(s"Executor to kill $id does not exist!")
+    }
+
+    // If we do not wish to replace the executors we kill, sync the target number of executors
+    // with the cluster manager to avoid allocating new ones. When computing the new target,
+    // take into account executors that are pending to be added or removed.
+    if (!replace) {
+      doRequestTotalExecutors(numExistingExecutors + numPendingExecutors
+        - executorsPendingToRemove.size - knownExecutors.size)
     }
-    // Killing executors means effectively that we want less executors than before, so also update
-    // the target number of executors to avoid having the backend allocate new ones.
-    val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
-      - filteredExecutorIds.size)
-    doRequestTotalExecutors(newTotal)
 
-    executorsPendingToRemove ++= filteredExecutorIds
-    doKillExecutors(filteredExecutorIds)
+    executorsPendingToRemove ++= knownExecutors
+    doKillExecutors(knownExecutors)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index b31b091..5a2670e 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark
 
+import java.util.concurrent.{ExecutorService, TimeUnit}
+
+import scala.collection.mutable
 import scala.language.postfixOps
 
 import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
@@ -25,11 +28,16 @@ import org.mockito.Matchers
 import org.mockito.Matchers._
 
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, RpcEndpointRef}
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.ManualClock
 
+/**
+ * A test suite for the heartbeating behavior between the driver and the executors.
+ */
 class HeartbeatReceiverSuite
   extends SparkFunSuite
   with BeforeAndAfterEach
@@ -40,23 +48,40 @@ class HeartbeatReceiverSuite
   private val executorId2 = "executor-2"
 
   // Shared state that must be reset before and after each test
-  private var scheduler: TaskScheduler = null
+  private var scheduler: TaskSchedulerImpl = null
   private var heartbeatReceiver: HeartbeatReceiver = null
   private var heartbeatReceiverRef: RpcEndpointRef = null
   private var heartbeatReceiverClock: ManualClock = null
 
+  // Helper private method accessors for HeartbeatReceiver
+  private val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen)
+  private val _executorTimeoutMs = PrivateMethod[Long]('executorTimeoutMs)
+  private val _killExecutorThread = PrivateMethod[ExecutorService]('killExecutorThread)
+
+  /**
+   * Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
+   * that uses a manual clock.
+   */
   override def beforeEach(): Unit = {
-    sc = spy(new SparkContext("local[2]", "test"))
-    scheduler = mock(classOf[TaskScheduler])
+    val conf = new SparkConf()
+      .setMaster("local[2]")
+      .setAppName("test")
+      .set("spark.dynamicAllocation.testing", "true")
+    sc = spy(new SparkContext(conf))
+    scheduler = mock(classOf[TaskSchedulerImpl])
     when(sc.taskScheduler).thenReturn(scheduler)
+    when(scheduler.sc).thenReturn(sc)
     heartbeatReceiverClock = new ManualClock
     heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
     heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver)
     when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true)
   }
 
+  /**
+   * After each test, clean up all state and stop the [[SparkContext]].
+   */
   override def afterEach(): Unit = {
-    resetSparkContext()
+    super.afterEach()
     scheduler = null
     heartbeatReceiver = null
     heartbeatReceiverRef = null
@@ -75,7 +100,7 @@ class HeartbeatReceiverSuite
     heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
     triggerHeartbeat(executorId1, executorShouldReregister = false)
     triggerHeartbeat(executorId2, executorShouldReregister = false)
-    val trackedExecutors = executorLastSeen(heartbeatReceiver)
+    val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
     assert(trackedExecutors.size === 2)
     assert(trackedExecutors.contains(executorId1))
     assert(trackedExecutors.contains(executorId2))
@@ -83,15 +108,15 @@ class HeartbeatReceiverSuite
 
   test("reregister if scheduler is not ready yet") {
     heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
-    // Task scheduler not set in HeartbeatReceiver
+    // Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
     triggerHeartbeat(executorId1, executorShouldReregister = true)
   }
 
   test("reregister if heartbeat from unregistered executor") {
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
-    // Received heartbeat from unknown receiver, so we ask it to re-register
+    // Received heartbeat from unknown executor, so we ask it to re-register
     triggerHeartbeat(executorId1, executorShouldReregister = true)
-    assert(executorLastSeen(heartbeatReceiver).isEmpty)
+    assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty)
   }
 
   test("reregister if heartbeat from removed executor") {
@@ -104,14 +129,14 @@ class HeartbeatReceiverSuite
     // A heartbeat from the second executor should require reregistering
     triggerHeartbeat(executorId1, executorShouldReregister = false)
     triggerHeartbeat(executorId2, executorShouldReregister = true)
-    val trackedExecutors = executorLastSeen(heartbeatReceiver)
+    val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
     assert(trackedExecutors.size === 1)
     assert(trackedExecutors.contains(executorId1))
     assert(!trackedExecutors.contains(executorId2))
   }
 
   test("expire dead hosts") {
-    val executorTimeout = executorTimeoutMs(heartbeatReceiver)
+    val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
     heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
@@ -124,12 +149,61 @@ class HeartbeatReceiverSuite
     heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
     // Only the second executor should be expired as a dead host
     verify(scheduler).executorLost(Matchers.eq(executorId2), any())
-    val trackedExecutors = executorLastSeen(heartbeatReceiver)
+    val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
     assert(trackedExecutors.size === 1)
     assert(trackedExecutors.contains(executorId1))
     assert(!trackedExecutors.contains(executorId2))
   }
 
+  test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
+    // Set up a fake backend and cluster manager to simulate killing executors
+    val rpcEnv = sc.env.rpcEnv
+    val fakeClusterManager = new FakeClusterManager(rpcEnv)
+    val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
+    val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
+    when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
+
+    // Register fake executors with our fake scheduler backend
+    // This is necessary because the backend refuses to kill executors it does not know about
+    fakeSchedulerBackend.start()
+    val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
+    val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
+    val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
+    val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
+    heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
+    triggerHeartbeat(executorId1, executorShouldReregister = false)
+    triggerHeartbeat(executorId2, executorShouldReregister = false)
+
+    // Adjust the target number of executors on the cluster manager side
+    assert(fakeClusterManager.getTargetNumExecutors === 0)
+    sc.requestTotalExecutors(2)
+    assert(fakeClusterManager.getTargetNumExecutors === 2)
+    assert(fakeClusterManager.getExecutorIdsToKill.isEmpty)
+
+    // Expire the executors. This should trigger our fake backend to kill the executors.
+    // Since the kill request is sent to the cluster manager asynchronously, we need to block
+    // on the kill thread to ensure that the cluster manager actually received our requests.
+    // Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms).
+    val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
+    heartbeatReceiverClock.advance(executorTimeout * 2)
+    heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
+    val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
+    killThread.shutdown() // needed for awaitTermination
+    killThread.awaitTermination(10L, TimeUnit.SECONDS)
+
+    // The target number of executors should not change! Otherwise, having an expired
+    // executor means we permanently adjust the target number downwards until we
+    // explicitly request new executors. For more detail, see SPARK-8119.
+    assert(fakeClusterManager.getTargetNumExecutors === 2)
+    assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2))
+  }
+
   /** Manually send a heartbeat and return the response. */
   private def triggerHeartbeat(
       executorId: String,
@@ -148,14 +222,49 @@ class HeartbeatReceiverSuite
     }
   }
 
-  // Helper methods to access private fields in HeartbeatReceiver
-  private val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen)
-  private val _executorTimeoutMs = PrivateMethod[Long]('executorTimeoutMs)
-  private def executorLastSeen(receiver: HeartbeatReceiver): collection.Map[String, Long] = {
-    receiver invokePrivate _executorLastSeen()
+}
+
+// TODO: use these classes to add end-to-end tests for dynamic allocation!
+
+/**
+ * Dummy RPC endpoint to simulate executors.
+ */
+private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint
+
+/**
+ * Dummy scheduler backend to simulate executor allocation requests to the cluster manager.
+ */
+private class FakeSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    rpcEnv: RpcEnv,
+    clusterManagerEndpoint: RpcEndpointRef)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    clusterManagerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
   }
-  private def executorTimeoutMs(receiver: HeartbeatReceiver): Long = {
-    receiver invokePrivate _executorTimeoutMs()
+
+  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
   }
+}
 
+/**
+ * Dummy cluster manager to simulate responses to executor allocation requests.
+ */
+private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
+  private var targetNumExecutors = 0
+  private val executorIdsToKill = new mutable.HashSet[String]
+
+  def getTargetNumExecutors: Int = targetNumExecutors
+  def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
+
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case RequestExecutors(requestedTotal) =>
+      targetNumExecutors = requestedTotal
+      context.reply(true)
+    case KillExecutors(executorIds) =>
+      executorIdsToKill ++= executorIds
+      context.reply(true)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org