You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/07 19:34:28 UTC

spark git commit: [SPARK-17370] Shuffle service files not invalidated when a slave is lost

Repository: spark
Updated Branches:
  refs/heads/master 76ad89e92 -> 649fa4bf1


[SPARK-17370] Shuffle service files not invalidated when a slave is lost

## What changes were proposed in this pull request?

DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime.

However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files.

The proposed fix is to also invalidate shuffle files when an executor is lost due to a `SlaveLost` event.

## How was this patch tested?

Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected.

cc mateiz

Author: Eric Liang <ek...@databricks.com>

Closes #14931 from ericl/sc-4439.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649fa4bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649fa4bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649fa4bf

Branch: refs/heads/master
Commit: 649fa4bf1d6fc9271ae56b6891bc93ebf57858d1
Parents: 76ad89e
Author: Eric Liang <ek...@databricks.com>
Authored: Wed Sep 7 12:33:50 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Sep 7 12:33:50 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  2 +-
 .../deploy/client/StandaloneAppClient.scala     |  4 +-
 .../client/StandaloneAppClientListener.scala    |  3 +-
 .../org/apache/spark/deploy/master/Master.scala |  4 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 +++++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |  3 +-
 .../spark/scheduler/ExecutorLossReason.scala    |  6 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  9 ++-
 .../cluster/StandaloneSchedulerBackend.scala    |  5 +-
 .../spark/deploy/client/AppClientSuite.scala    |  3 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 58 +++++++++++++++++---
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 12 files changed, 92 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 34c0696..ac09c6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -135,7 +135,7 @@ private[deploy] object DeployMessages {
   }
 
   case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
-    exitStatus: Option[Int])
+    exitStatus: Option[Int], workerLost: Boolean)
 
   case class ApplicationRemoved(message: String)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 7a60f08..93f58ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -174,12 +174,12 @@ private[spark] class StandaloneAppClient(
           cores))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
-      case ExecutorUpdated(id, state, message, exitStatus) =>
+      case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
         val fullId = appId + "/" + id
         val messageText = message.map(s => " (" + s + ")").getOrElse("")
         logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
         if (ExecutorState.isFinished(state)) {
-          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
+          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
         }
 
       case MasterChanged(masterRef, masterWebUiUrl) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 370b16c..64255ec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener {
   def executorAdded(
       fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
 
-  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
+  def executorRemoved(
+      fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dfffc47..dcf4163 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -252,7 +252,7 @@ private[deploy] class Master(
             appInfo.resetRetryCount()
           }
 
-          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
+          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
 
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and app
@@ -766,7 +766,7 @@ private[deploy] class Master(
     for (exec <- worker.executors.values) {
       logInfo("Telling app of lost executor: " + exec.id)
       exec.application.driver.send(ExecutorUpdated(
-        exec.id, ExecutorState.LOST, Some("worker lost"), None))
+        exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
       exec.state = ExecutorState.LOST
       exec.application.removeExecutor(exec)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4eb7c81..dd47c1d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -239,8 +239,8 @@ class DAGScheduler(
   /**
    * Called by TaskScheduler implementation when an executor fails.
    */
-  def executorLost(execId: String): Unit = {
-    eventProcessLoop.post(ExecutorLost(execId))
+  def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
+    eventProcessLoop.post(ExecutorLost(execId, reason))
   }
 
   /**
@@ -1281,7 +1281,7 @@ class DAGScheduler(
 
           // TODO: mark the executor as failed only if there were lots of fetch failures on it
           if (bmAddress != null) {
-            handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
+            handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
           }
         }
 
@@ -1306,15 +1306,16 @@ class DAGScheduler(
    * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
    * We will also assume that we've lost all shuffle blocks associated with the executor if the
-   * executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
-   * occurred, in which case we presume all shuffle data related to this executor to be lost.
+   * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
+   * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
+   * presume all shuffle data related to this executor to be lost.
    *
    * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
-      fetchFailed: Boolean,
+      filesLost: Boolean,
       maybeEpoch: Option[Long] = None) {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
@@ -1322,7 +1323,8 @@ class DAGScheduler(
       logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
       blockManagerMaster.removeExecutor(execId)
 
-      if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
+      if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
+        logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
         // TODO: This will be really slow if we keep accumulating shuffle map stages
         for ((shuffleId, stage) <- shuffleIdToMapStage) {
           stage.removeOutputsOnExecutor(execId)
@@ -1624,8 +1626,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case ExecutorAdded(execId, host) =>
       dagScheduler.handleExecutorAdded(execId, host)
 
-    case ExecutorLost(execId) =>
-      dagScheduler.handleExecutorLost(execId, fetchFailed = false)
+    case ExecutorLost(execId, reason) =>
+      val filesLost = reason match {
+        case SlaveLost(_, true) => true
+        case _ => false
+      }
+      dagScheduler.handleExecutorLost(execId, filesLost)
 
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 8c76112..03781a2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent(
 
 private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
 
-private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
+private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
+  extends DAGSchedulerEvent
 
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 642bf81..46a35b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
  */
 private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
 
+/**
+ * @param _message human readable loss reason
+ * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
+ */
 private[spark]
-case class SlaveLost(_message: String = "Slave lost")
+case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
   extends ExecutorLossReason(_message)

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7d90553..ee5cbfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -346,6 +346,7 @@ private[spark] class TaskSchedulerImpl(
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
+    var reason: Option[ExecutorLossReason] = None
     synchronized {
       try {
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -353,8 +354,9 @@ private[spark] class TaskSchedulerImpl(
           val execId = taskIdToExecutorId(tid)
 
           if (executorIdToTaskCount.contains(execId)) {
-            removeExecutor(execId,
+            reason = Some(
               SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
+            removeExecutor(execId, reason.get)
             failedExecutor = Some(execId)
           }
         }
@@ -387,7 +389,8 @@ private[spark] class TaskSchedulerImpl(
     }
     // Update the DAGScheduler without holding a lock on this, since that can deadlock
     if (failedExecutor.isDefined) {
-      dagScheduler.executorLost(failedExecutor.get)
+      assert(reason.isDefined)
+      dagScheduler.executorLost(failedExecutor.get, reason.get)
       backend.reviveOffers()
     }
   }
@@ -513,7 +516,7 @@ private[spark] class TaskSchedulerImpl(
     }
     // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
     if (failedExecutor.isDefined) {
-      dagScheduler.executorLost(failedExecutor.get)
+      dagScheduler.executorLost(failedExecutor.get, reason)
       backend.reviveOffers()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 5068bf2..04d40e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -150,10 +150,11 @@ private[spark] class StandaloneSchedulerBackend(
       fullId, hostPort, cores, Utils.megabytesToString(memory)))
   }
 
-  override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
+  override def executorRemoved(
+      fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
-      case None => SlaveLost(message)
+      case None => SlaveLost(message, workerLost = workerLost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 416efaa..bc58fb2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -210,7 +210,8 @@ class AppClientSuite
       execAddedList.add(id)
     }
 
-    def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
+    def executorRemoved(
+        id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
       execRemovedList.add(id)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3382474..6787b30 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
 
@@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
   override def beforeEach(): Unit = {
     super.beforeEach()
-    sc = new SparkContext("local", "DAGSchedulerSuite")
+    init(new SparkConf())
+  }
+
+  private def init(testConf: SparkConf): Unit = {
+    sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
     sparkListener.submittedStageInfos.clear()
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
@@ -621,6 +626,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     assertDataStructuresEmpty()
   }
 
+  private val shuffleFileLossTests = Seq(
+    ("slave lost with shuffle service", SlaveLost("", false), true, false),
+    ("worker lost with shuffle service", SlaveLost("", true), true, true),
+    ("worker lost without shuffle service", SlaveLost("", true), false, true),
+    ("executor failure with shuffle service", ExecutorKilled, true, false),
+    ("executor failure without shuffle service", ExecutorKilled, false, true))
+
+  for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
+    val maybeLost = if (expectFileLoss) {
+      "lost"
+    } else {
+      "not lost"
+    }
+    test(s"shuffle files $maybeLost when $eventDescription") {
+      // reset the test context with the right shuffle service config
+      afterEach()
+      val conf = new SparkConf()
+      conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+      init(conf)
+      assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
+
+      val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+      val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+      val shuffleId = shuffleDep.shuffleId
+      val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+      submit(reduceRdd, Array(0))
+      complete(taskSets(0), Seq(
+        (Success, makeMapStatus("hostA", 1)),
+        (Success, makeMapStatus("hostB", 1))))
+      runEvent(ExecutorLost("exec-hostA", event))
+      if (expectFileLoss) {
+        intercept[MetadataFetchFailedException] {
+          mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
+        }
+      } else {
+        assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+          HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+      }
+    }
+  }
 
   // Helper function to validate state when creating tests for task failures
   private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
@@ -628,7 +673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     assert(stageAttempt.stageAttemptId == attempt)
   }
 
-
   // Helper functions to extract commonly used code in Fetch Failure test cases
   private def setupStageAbortTest(sc: SparkContext) {
     sc.listenerBus.addListener(new EndListener())
@@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
     // pretend we were told hostA went away
     val oldEpoch = mapOutputTracker.getEpoch
-    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     val newEpoch = mapOutputTracker.getEpoch
     assert(newEpoch > oldEpoch)
 
@@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     ))
 
     // then one executor dies, and a task fails in stage 1
-    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     runEvent(makeCompletionEvent(
       taskSets(1).tasks(0),
       FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
@@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
       makeMapStatus("hostA", reduceRdd.partitions.length)))
 
     // now that host goes down
-    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
 
     // so we resubmit those tasks
     runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
@@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     submit(reduceRdd, Array(0))
     // blockManagerMaster.removeExecutor("exec-hostA")
     // pretend we were told hostA went away
-    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
     // rather than marking it is as failed and waiting.
     complete(taskSets(0), Seq(
@@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
     // Pretend host A was lost
     val oldEpoch = mapOutputTracker.getEpoch
-    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     val newEpoch = mapOutputTracker.getEpoch
     assert(newEpoch > oldEpoch)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649fa4bf/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 36d1c56..7d6ad08 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
 
   override def executorAdded(execId: String, host: String) {}
 
-  override def executorLost(execId: String) {}
+  override def executorLost(execId: String, reason: ExecutorLossReason) {}
 
   override def taskSetFailed(
       taskSet: TaskSet,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org