You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/25 01:35:34 UTC

[03/20] git commit: Fixed most issues with unit tests

Fixed most issues with unit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a124658e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a124658e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a124658e

Branch: refs/heads/master
Commit: a124658e53a5abeda00a2582385a294c8e452d21
Parents: 5e91495
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Oct 30 19:29:38 2013 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Oct 30 19:29:38 2013 -0700

----------------------------------------------------------------------
 .../spark/scheduler/DAGSchedulerSuite.scala     |  94 +++---
 .../org/apache/spark/scheduler/FakeTask.scala   |  26 ++
 .../spark/scheduler/TaskResultGetterSuite.scala | 111 +++++++
 .../spark/scheduler/TaskSchedulerSuite.scala    | 265 ++++++++++++++++
 .../spark/scheduler/TaskSetManagerSuite.scala   | 317 ++++++++++++++++++
 .../cluster/ClusterSchedulerSuite.scala         | 267 ----------------
 .../cluster/ClusterTaskSetManagerSuite.scala    | 318 -------------------
 .../spark/scheduler/cluster/FakeTask.scala      |  27 --
 .../cluster/TaskResultGetterSuite.scala         | 112 -------
 9 files changed, 767 insertions(+), 770 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 00f2fdd..394a1bb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -34,6 +34,24 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 
 /**
+ * TaskScheduler that records the task sets that the DAGScheduler requested executed.
+ */
+class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) {
+  /** Set of TaskSets the DAGScheduler has requested executed. */
+  val taskSets = scala.collection.mutable.Buffer[TaskSet]()
+  override def start() = {}
+  override def stop() = {}
+  override def submitTasks(taskSet: TaskSet) = {
+    // normally done by TaskSetManager
+    taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
+    taskSets += taskSet
+  }
+  override def cancelTasks(stageId: Int) {}
+  override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+  override def defaultParallelism() = 2
+}
+
+/**
  * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
  * rather than spawning an event loop thread as happens in the real code. They use EasyMock
  * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
@@ -46,24 +64,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
  * and capturing the resulting TaskSets from the mock TaskScheduler.
  */
 class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
-  /** Set of TaskSets the DAGScheduler has requested executed. */
-  val taskSets = scala.collection.mutable.Buffer[TaskSet]()
-  val taskScheduler = new TaskScheduler() {
-    override def rootPool: Pool = null
-    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
-    override def start() = {}
-    override def stop() = {}
-    override def submitTasks(taskSet: TaskSet) = {
-      // normally done by TaskSetManager
-      taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
-      taskSets += taskSet
-    }
-    override def cancelTasks(stageId: Int) {}
-    override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
-    override def defaultParallelism() = 2
-  }
-
+  var taskScheduler: TaskSetRecordingTaskScheduler = null
   var mapOutputTracker: MapOutputTrackerMaster = null
   var scheduler: DAGScheduler = null
 
@@ -96,7 +97,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
 
   before {
     sc = new SparkContext("local", "DAGSchedulerSuite")
-    taskSets.clear()
+    taskScheduler = new TaskSetRecordingTaskScheduler(sc)
+    taskScheduler.taskSets.clear()
     cacheLocations.clear()
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster()
@@ -204,7 +206,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
   test("run trivial job") {
     val rdd = makeRdd(1, Nil)
     submit(rdd, Array(0))
-    complete(taskSets(0), List((Success, 42)))
+    complete(taskScheduler.taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
   }
 
@@ -225,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val baseRdd = makeRdd(1, Nil)
     val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
     submit(finalRdd, Array(0))
-    complete(taskSets(0), Seq((Success, 42)))
+    complete(taskScheduler.taskSets(0), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
   }
 
@@ -235,7 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     cacheLocations(baseRdd.id -> 0) =
       Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
     submit(finalRdd, Array(0))
-    val taskSet = taskSets(0)
+    val taskSet = taskScheduler.taskSets(0)
     assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
     complete(taskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
@@ -243,7 +245,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
 
   test("trivial job failure") {
     submit(makeRdd(1, Nil), Array(0))
-    failed(taskSets(0), "some failure")
+    failed(taskScheduler.taskSets(0), "some failure")
     assert(failure.getMessage === "Job aborted: some failure")
   }
 
@@ -253,12 +255,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val shuffleId = shuffleDep.shuffleId
     val reduceRdd = makeRdd(1, List(shuffleDep))
     submit(reduceRdd, Array(0))
-    complete(taskSets(0), Seq(
+    complete(taskScheduler.taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
-    complete(taskSets(1), Seq((Success, 42)))
+    complete(taskScheduler.taskSets(1), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
   }
 
@@ -268,11 +270,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val shuffleId = shuffleDep.shuffleId
     val reduceRdd = makeRdd(2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
-    complete(taskSets(0), Seq(
+    complete(taskScheduler.taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
     // the 2nd ResultTask failed
-    complete(taskSets(1), Seq(
+    complete(taskScheduler.taskSets(1), Seq(
         (Success, 42),
         (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
     // this will get called
@@ -280,10 +282,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskScheduler.taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
     // we can see both result blocks now
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
-    complete(taskSets(3), Seq((Success, 43)))
+    complete(taskScheduler.taskSets(3), Seq((Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
   }
 
@@ -299,7 +301,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val newEpoch = mapOutputTracker.getEpoch
     assert(newEpoch > oldEpoch)
     val noAccum = Map[Long, Any]()
-    val taskSet = taskSets(0)
+    val taskSet = taskScheduler.taskSets(0)
     // should be ignored for being too old
     runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
     // should work because it's a non-failed host
@@ -311,7 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
-    complete(taskSets(1), Seq((Success, 42), (Success, 43)))
+    complete(taskScheduler.taskSets(1), Seq((Success, 42), (Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
   }
 
@@ -326,14 +328,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     runEvent(ExecutorLost("exec-hostA"))
     // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
     // rather than marking it is as failed and waiting.
-    complete(taskSets(0), Seq(
+    complete(taskScheduler.taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 1)),
        (Success, makeMapStatus("hostB", 1))))
    // have hostC complete the resubmitted task
-   complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
+   complete(taskScheduler.taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
           Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
-   complete(taskSets(2), Seq((Success, 42)))
+   complete(taskScheduler.taskSets(2), Seq((Success, 42)))
    assert(results === Map(0 -> 42))
  }
 
@@ -345,23 +347,23 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val finalRdd = makeRdd(1, List(shuffleDepTwo))
     submit(finalRdd, Array(0))
     // have the first stage complete normally
-    complete(taskSets(0), Seq(
+    complete(taskScheduler.taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 2)),
         (Success, makeMapStatus("hostB", 2))))
     // have the second stage complete normally
-    complete(taskSets(1), Seq(
+    complete(taskScheduler.taskSets(1), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostC", 1))))
     // fail the third stage because hostA went down
-    complete(taskSets(2), Seq(
+    complete(taskScheduler.taskSets(2), Seq(
         (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // have DAGScheduler try again
     scheduler.resubmitFailedStages()
-    complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
-    complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
-    complete(taskSets(5), Seq((Success, 42)))
+    complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
+    complete(taskScheduler.taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskScheduler.taskSets(5), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
   }
 
@@ -375,24 +377,24 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
     cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
     // complete stage 2
-    complete(taskSets(0), Seq(
+    complete(taskScheduler.taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 2)),
         (Success, makeMapStatus("hostB", 2))))
     // complete stage 1
-    complete(taskSets(1), Seq(
+    complete(taskScheduler.taskSets(1), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
     // pretend stage 0 failed because hostA went down
-    complete(taskSets(2), Seq(
+    complete(taskScheduler.taskSets(2), Seq(
         (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
     scheduler.resubmitFailedStages()
-    assertLocations(taskSets(3), Seq(Seq("hostD")))
+    assertLocations(taskScheduler.taskSets(3), Seq(Seq("hostD")))
     // allow hostD to recover
-    complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
-    complete(taskSets(4), Seq((Success, 42)))
+    complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
+    complete(taskScheduler.taskSets(4), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
new file mode 100644
index 0000000..0b90c4e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.TaskContext
+
+class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
+  override def runTask(context: TaskContext): Int = 0
+
+  override def preferredLocations: Seq[TaskLocation] = prefLocs
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
new file mode 100644
index 0000000..30e6bc5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.storage.TaskResultBlockId
+
+/**
+ * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
+ *
+ * Used to test the case where a BlockManager evicts the task result (or dies) before the
+ * TaskResult is retrieved.
+ */
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler)
+  extends TaskResultGetter(sparkEnv, scheduler) {
+  var removedResult = false
+
+  override def enqueueSuccessfulTask(
+    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
+    if (!removedResult) {
+      // Only remove the result once, since we'd like to test the case where the task eventually
+      // succeeds.
+      serializer.get().deserialize[TaskResult[_]](serializedData) match {
+        case IndirectTaskResult(blockId) =>
+          sparkEnv.blockManager.master.removeBlock(blockId)
+        case directResult: DirectTaskResult[_] =>
+          taskSetManager.abort("Internal error: expect only indirect results") 
+      }
+      serializedData.rewind()
+      removedResult = true
+    }
+    super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
+  } 
+}
+
+/**
+ * Tests related to handling task results (both direct and indirect).
+ */
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
+  with LocalSparkContext {
+
+  override def beforeAll {
+    // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+    // as we can make it) so the tests don't take too long.
+    System.setProperty("spark.akka.frameSize", "1")
+  }
+
+  before {
+    sc = new SparkContext("local", "test")
+  }
+
+  override def afterAll {
+    System.clearProperty("spark.akka.frameSize")
+  }
+
+  test("handling results smaller than Akka frame size") {
+    val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+    assert(result === 2)
+  }
+
+  test("handling results larger than Akka frame size") { 
+    val akkaFrameSize =
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+    assert(result === 1.to(akkaFrameSize).toArray)
+
+    val RESULT_BLOCK_ID = TaskResultBlockId(0)
+    assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
+      "Expect result to be removed from the block manager.")
+  }
+
+  test("task retried if result missing from block manager") {
+    // If this test hangs, it's probably because no resource offers were made after the task
+    // failed.
+    val scheduler: TaskScheduler = sc.taskScheduler match {
+      case clusterScheduler: TaskScheduler =>
+        clusterScheduler
+      case _ =>
+        assert(false, "Expect local cluster to use TaskScheduler")
+        throw new ClassCastException
+    }
+    scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+    val akkaFrameSize =
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+    assert(result === 1.to(akkaFrameSize).toArray)
+
+    // Make sure two tasks were run (one failed one, and a second retried one).
+    assert(scheduler.nextTaskId.get() === 2)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
new file mode 100644
index 0000000..bfbffdf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import scala.collection.mutable.ArrayBuffer
+
+import java.util.Properties
+
+class FakeTaskSetManager(
+    initPriority: Int,
+    initStageId: Int,
+    initNumTasks: Int,
+    taskScheduler: TaskScheduler,
+    taskSet: TaskSet)
+  extends TaskSetManager(taskScheduler, taskSet) {
+
+  parent = null
+  weight = 1
+  minShare = 2
+  runningTasks = 0
+  priority = initPriority
+  stageId = initStageId
+  name = "TaskSet_"+stageId
+  override val numTasks = initNumTasks
+  tasksSuccessful = 0
+
+  def increaseRunningTasks(taskNum: Int) {
+    runningTasks += taskNum
+    if (parent != null) {
+      parent.increaseRunningTasks(taskNum)
+    }
+  }
+
+  def decreaseRunningTasks(taskNum: Int) {
+    runningTasks -= taskNum
+    if (parent != null) {
+      parent.decreaseRunningTasks(taskNum)
+    }
+  }
+
+  override def addSchedulable(schedulable: Schedulable) {
+  }
+
+  override def removeSchedulable(schedulable: Schedulable) {
+  }
+
+  override def getSchedulableByName(name: String): Schedulable = {
+    return null
+  }
+
+  override def executorLost(executorId: String, host: String): Unit = {
+  }
+
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
+  {
+    if (tasksSuccessful + runningTasks < numTasks) {
+      increaseRunningTasks(1)
+      return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+    }
+    return None
+  }
+
+  override def checkSpeculatableTasks(): Boolean = {
+    return true
+  }
+
+  def taskFinished() {
+    decreaseRunningTasks(1)
+    tasksSuccessful +=1
+    if (tasksSuccessful == numTasks) {
+      parent.removeSchedulable(this)
+    }
+  }
+
+  def abort() {
+    decreaseRunningTasks(runningTasks)
+    parent.removeSchedulable(this)
+  }
+}
+
+class TaskSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
+
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
+  }
+
+  def resourceOffer(rootPool: Pool): Int = {
+    val taskSetQueue = rootPool.getSortedTaskSetQueue()
+    /* Just for Test*/
+    for (manager <- taskSetQueue) {
+       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
+         manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+    }
+    for (taskSet <- taskSetQueue) {
+      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
+        case Some(task) =>
+          return taskSet.stageId
+        case None => {}
+      }
+    }
+    -1
+  }
+
+  def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
+    assert(resourceOffer(rootPool) === expectedTaskSetId)
+  }
+
+  test("FIFO Scheduler Test") {
+    sc = new SparkContext("local", "TaskSchedulerSuite")
+    val taskScheduler = new TaskScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+    schedulableBuilder.buildPools()
+
+    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
+    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
+    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
+
+    checkTaskSetId(rootPool, 0)
+    resourceOffer(rootPool)
+    checkTaskSetId(rootPool, 1)
+    resourceOffer(rootPool)
+    taskSetManager1.abort()
+    checkTaskSetId(rootPool, 2)
+  }
+
+  test("Fair Scheduler Test") {
+    sc = new SparkContext("local", "TaskSchedulerSuite")
+    val taskScheduler = new TaskScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    System.setProperty("spark.scheduler.allocation.file", xmlPath)
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+    schedulableBuilder.buildPools()
+
+    assert(rootPool.getSchedulableByName("default") != null)
+    assert(rootPool.getSchedulableByName("1") != null)
+    assert(rootPool.getSchedulableByName("2") != null)
+    assert(rootPool.getSchedulableByName("3") != null)
+    assert(rootPool.getSchedulableByName("1").minShare === 2)
+    assert(rootPool.getSchedulableByName("1").weight === 1)
+    assert(rootPool.getSchedulableByName("2").minShare === 3)
+    assert(rootPool.getSchedulableByName("2").weight === 1)
+    assert(rootPool.getSchedulableByName("3").minShare === 0)
+    assert(rootPool.getSchedulableByName("3").weight === 1)
+
+    val properties1 = new Properties()
+    properties1.setProperty("spark.scheduler.pool","1")
+    val properties2 = new Properties()
+    properties2.setProperty("spark.scheduler.pool","2")
+
+    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
+    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
+    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
+
+    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
+    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
+    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
+
+    checkTaskSetId(rootPool, 0)
+    checkTaskSetId(rootPool, 3)
+    checkTaskSetId(rootPool, 3)
+    checkTaskSetId(rootPool, 1)
+    checkTaskSetId(rootPool, 4)
+    checkTaskSetId(rootPool, 2)
+    checkTaskSetId(rootPool, 2)
+    checkTaskSetId(rootPool, 4)
+
+    taskSetManager12.taskFinished()
+    assert(rootPool.getSchedulableByName("1").runningTasks === 3)
+    taskSetManager24.abort()
+    assert(rootPool.getSchedulableByName("2").runningTasks === 2)
+  }
+
+  test("Nested Pool Test") {
+    sc = new SparkContext("local", "TaskSchedulerSuite")
+    val taskScheduler = new TaskScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
+    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+    rootPool.addSchedulable(pool0)
+    rootPool.addSchedulable(pool1)
+
+    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
+    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+    pool0.addSchedulable(pool00)
+    pool0.addSchedulable(pool01)
+
+    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
+    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+    pool1.addSchedulable(pool10)
+    pool1.addSchedulable(pool11)
+
+    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
+    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
+    pool00.addSchedulable(taskSetManager000)
+    pool00.addSchedulable(taskSetManager001)
+
+    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
+    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
+    pool01.addSchedulable(taskSetManager010)
+    pool01.addSchedulable(taskSetManager011)
+
+    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
+    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
+    pool10.addSchedulable(taskSetManager100)
+    pool10.addSchedulable(taskSetManager101)
+
+    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
+    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
+    pool11.addSchedulable(taskSetManager110)
+    pool11.addSchedulable(taskSetManager111)
+
+    checkTaskSetId(rootPool, 0)
+    checkTaskSetId(rootPool, 4)
+    checkTaskSetId(rootPool, 6)
+    checkTaskSetId(rootPool, 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
new file mode 100644
index 0000000..fe3ea7b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import java.nio.ByteBuffer
+import org.apache.spark.util.{Utils, FakeClock}
+
+class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+    taskScheduler.startedTasks += taskInfo.index
+  }
+
+  override def taskEnded(
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: mutable.Map[Long, Any],
+      taskInfo: TaskInfo,
+      taskMetrics: TaskMetrics) {
+    taskScheduler.endedTasks(taskInfo.index) = reason
+  }
+
+  override def executorGained(execId: String, host: String) {}
+
+  override def executorLost(execId: String) {}
+
+  override def taskSetFailed(taskSet: TaskSet, reason: String) {
+    taskScheduler.taskSetsFailed += taskSet.id
+  }
+}
+
+/**
+ * A mock TaskScheduler implementation that just remembers information about tasks started and
+ * feedback received from the TaskSetManagers. Note that it's important to initialize this with
+ * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
+ * to work, and these are required for locality in TaskSetManager.
+ */
+class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+  extends TaskScheduler(sc)
+{
+  val startedTasks = new ArrayBuffer[Long]
+  val endedTasks = new mutable.HashMap[Long, TaskEndReason]
+  val finishedManagers = new ArrayBuffer[TaskSetManager]
+  val taskSetsFailed = new ArrayBuffer[String]
+
+  val executors = new mutable.HashMap[String, String] ++ liveExecutors
+
+  dagScheduler = new FakeDAGScheduler(this)
+
+  def removeExecutor(execId: String): Unit = executors -= execId
+
+  override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
+
+  override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
+
+  override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+}
+
+class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+  import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+
+  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+
+  test("TaskSet with no preferences") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(1)
+    val manager = new TaskSetManager(sched, taskSet)
+
+    // Offer a host with no CPUs
+    assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
+
+    // Offer a host with process-local as the constraint; this should work because the TaskSet
+    // above won't have any locality preferences
+    val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+    assert(taskOption.isDefined)
+    val task = taskOption.get
+    assert(task.executorId === "exec1")
+    assert(sched.startedTasks.contains(0))
+
+    // Re-offer the host -- now we should get no more tasks
+    assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+
+    // Tell it the task has finished
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+    assert(sched.endedTasks(0) === Success)
+    assert(sched.finishedManagers.contains(manager))
+  }
+
+  test("multiple offers with no preferences") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(3)
+    val manager = new TaskSetManager(sched, taskSet)
+
+    // First three offers should all find tasks
+    for (i <- 0 until 3) {
+      val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === "exec1")
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2))
+
+    // Re-offer the host -- now we should get no more tasks
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    // Finish the first two tasks
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+    manager.handleSuccessfulTask(1, createTaskResult(1))
+    assert(sched.endedTasks(0) === Success)
+    assert(sched.endedTasks(1) === Success)
+    assert(!sched.finishedManagers.contains(manager))
+
+    // Finish the last task
+    manager.handleSuccessfulTask(2, createTaskResult(2))
+    assert(sched.endedTasks(2) === Success)
+    assert(sched.finishedManagers.contains(manager))
+  }
+
+  test("basic delay scheduling") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = createTaskSet(4,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host2", "exec2")),
+      Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
+      Seq()   // Last task has no locality prefs
+    )
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, clock)
+
+    // First offer host1, exec1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+
+    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+
+    // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+
+    // Offer host1, exec1 again, at ANY level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1, exec1 again, at ANY level: task 1 should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+  }
+
+  test("delay scheduling with fallback") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc,
+      ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+    val taskSet = createTaskSet(5,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3")),
+      Seq(TaskLocation("host2"))
+    )
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, clock)
+
+    // First offer host1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1 again: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1 again: second task (on host2) should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Offer host1 again: third task (on host2) should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+    // Offer host2: fifth task (also on host2) should get chosen
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+
+    // Now that we've launched a local task, we should no longer launch the task for host3
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // After another delay, we can go ahead and launch that task non-locally
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+  }
+
+  test("delay scheduling with failed hosts") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = createTaskSet(3,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3"))
+    )
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, clock)
+
+    // First offer host1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1 again: third task should be chosen immediately because host3 is not up
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+    // After this, nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    // Now mark host2 as dead
+    sched.removeExecutor("exec2")
+    manager.executorLost("exec2", "host2")
+
+    // Task 1 should immediately be launched on host1 because its original host is gone
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Now that all tasks have launched, nothing new should be launched anywhere else
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+  }
+
+  test("task result lost") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(1)
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, clock)
+
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Tell it the task has finished but the result was lost.
+    manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
+    assert(sched.endedTasks(0) === TaskResultLost)
+
+    // Re-offer the host -- now we should get task 0 again.
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+  }
+
+  test("repeated failures lead to task set abortion") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(1)
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, clock)
+
+    // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
+    // after the last failure.
+    (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+      val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
+      assert(offerResult != None,
+        "Expect resource offer on iteration %s to return a task".format(index))
+      assert(offerResult.get.index === 0)
+      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
+      if (index < manager.MAX_TASK_FAILURES) {
+        assert(!sched.taskSetsFailed.contains(taskSet.id))
+      } else {
+        assert(sched.taskSetsFailed.contains(taskSet.id))
+      }
+    }
+  }
+
+
+  /**
+   * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
+   * locations for each task (given as varargs) if this sequence is not empty.
+   */
+  def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+    if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+      throw new IllegalArgumentException("Wrong number of task locations")
+    }
+    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+      new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+    }
+    new TaskSet(tasks, 0, 0, 0, null)
+  }
+
+  def createTaskResult(id: Int): DirectTaskResult[Int] = {
+    new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
deleted file mode 100644
index 95d3553..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import scala.collection.mutable.ArrayBuffer
-
-import java.util.Properties
-
-class FakeTaskSetManager(
-    initPriority: Int,
-    initStageId: Int,
-    initNumTasks: Int,
-    clusterScheduler: ClusterScheduler,
-    taskSet: TaskSet)
-  extends ClusterTaskSetManager(clusterScheduler, taskSet) {
-
-  parent = null
-  weight = 1
-  minShare = 2
-  runningTasks = 0
-  priority = initPriority
-  stageId = initStageId
-  name = "TaskSet_"+stageId
-  override val numTasks = initNumTasks
-  tasksSuccessful = 0
-
-  def increaseRunningTasks(taskNum: Int) {
-    runningTasks += taskNum
-    if (parent != null) {
-      parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  def decreaseRunningTasks(taskNum: Int) {
-    runningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable) {
-  }
-
-  override def removeSchedulable(schedulable: Schedulable) {
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    return null
-  }
-
-  override def executorLost(executorId: String, host: String): Unit = {
-  }
-
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    if (tasksSuccessful + runningTasks < numTasks) {
-      increaseRunningTasks(1)
-      return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
-    }
-    return None
-  }
-
-  override def checkSpeculatableTasks(): Boolean = {
-    return true
-  }
-
-  def taskFinished() {
-    decreaseRunningTasks(1)
-    tasksSuccessful +=1
-    if (tasksSuccessful == numTasks) {
-      parent.removeSchedulable(this)
-    }
-  }
-
-  def abort() {
-    decreaseRunningTasks(runningTasks)
-    parent.removeSchedulable(this)
-  }
-}
-
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
-
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
-    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
-  }
-
-  def resourceOffer(rootPool: Pool): Int = {
-    val taskSetQueue = rootPool.getSortedTaskSetQueue()
-    /* Just for Test*/
-    for (manager <- taskSetQueue) {
-       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
-         manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
-    }
-    for (taskSet <- taskSetQueue) {
-      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
-        case Some(task) =>
-          return taskSet.stageId
-        case None => {}
-      }
-    }
-    -1
-  }
-
-  def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
-    assert(resourceOffer(rootPool) === expectedTaskSetId)
-  }
-
-  test("FIFO Scheduler Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
-    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
-    schedulableBuilder.buildPools()
-
-    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
-    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
-    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
-    checkTaskSetId(rootPool, 0)
-    resourceOffer(rootPool)
-    checkTaskSetId(rootPool, 1)
-    resourceOffer(rootPool)
-    taskSetManager1.abort()
-    checkTaskSetId(rootPool, 2)
-  }
-
-  test("Fair Scheduler Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool)
-    schedulableBuilder.buildPools()
-
-    assert(rootPool.getSchedulableByName("default") != null)
-    assert(rootPool.getSchedulableByName("1") != null)
-    assert(rootPool.getSchedulableByName("2") != null)
-    assert(rootPool.getSchedulableByName("3") != null)
-    assert(rootPool.getSchedulableByName("1").minShare === 2)
-    assert(rootPool.getSchedulableByName("1").weight === 1)
-    assert(rootPool.getSchedulableByName("2").minShare === 3)
-    assert(rootPool.getSchedulableByName("2").weight === 1)
-    assert(rootPool.getSchedulableByName("3").minShare === 0)
-    assert(rootPool.getSchedulableByName("3").weight === 1)
-
-    val properties1 = new Properties()
-    properties1.setProperty("spark.scheduler.pool","1")
-    val properties2 = new Properties()
-    properties2.setProperty("spark.scheduler.pool","2")
-
-    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
-    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
-    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
-    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
-    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
-    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 1)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 4)
-
-    taskSetManager12.taskFinished()
-    assert(rootPool.getSchedulableByName("1").runningTasks === 3)
-    taskSetManager24.abort()
-    assert(rootPool.getSchedulableByName("2").runningTasks === 2)
-  }
-
-  test("Nested Pool Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
-    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
-    rootPool.addSchedulable(pool0)
-    rootPool.addSchedulable(pool1)
-
-    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
-    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
-    pool0.addSchedulable(pool00)
-    pool0.addSchedulable(pool01)
-
-    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
-    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
-    pool1.addSchedulable(pool10)
-    pool1.addSchedulable(pool11)
-
-    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
-    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
-    pool00.addSchedulable(taskSetManager000)
-    pool00.addSchedulable(taskSetManager001)
-
-    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
-    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
-    pool01.addSchedulable(taskSetManager010)
-    pool01.addSchedulable(taskSetManager011)
-
-    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
-    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
-    pool10.addSchedulable(taskSetManager100)
-    pool10.addSchedulable(taskSetManager101)
-
-    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
-    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
-    pool11.addSchedulable(taskSetManager110)
-    pool11.addSchedulable(taskSetManager111)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 6)
-    checkTaskSetId(rootPool, 2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
deleted file mode 100644
index b97f2b1..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable
-
-import org.scalatest.FunSuite
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.executor.TaskMetrics
-import java.nio.ByteBuffer
-import org.apache.spark.util.{Utils, FakeClock}
-
-class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
-  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
-    taskScheduler.startedTasks += taskInfo.index
-  }
-
-  override def taskEnded(
-      task: Task[_],
-      reason: TaskEndReason,
-      result: Any,
-      accumUpdates: mutable.Map[Long, Any],
-      taskInfo: TaskInfo,
-      taskMetrics: TaskMetrics) {
-    taskScheduler.endedTasks(taskInfo.index) = reason
-  }
-
-  override def executorGained(execId: String, host: String) {}
-
-  override def executorLost(execId: String) {}
-
-  override def taskSetFailed(taskSet: TaskSet, reason: String) {
-    taskScheduler.taskSetsFailed += taskSet.id
-  }
-}
-
-/**
- * A mock ClusterScheduler implementation that just remembers information about tasks started and
- * feedback received from the TaskSetManagers. Note that it's important to initialize this with
- * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
- */
-class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
-  extends ClusterScheduler(sc)
-{
-  val startedTasks = new ArrayBuffer[Long]
-  val endedTasks = new mutable.HashMap[Long, TaskEndReason]
-  val finishedManagers = new ArrayBuffer[TaskSetManager]
-  val taskSetsFailed = new ArrayBuffer[String]
-
-  val executors = new mutable.HashMap[String, String] ++ liveExecutors
-
-  dagScheduler = new FakeDAGScheduler(this)
-
-  def removeExecutor(execId: String): Unit = executors -= execId
-
-  override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
-
-  override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
-
-  override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
-}
-
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
-  import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
-
-  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
-
-  test("TaskSet with no preferences") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
-    val taskSet = createTaskSet(1)
-    val manager = new ClusterTaskSetManager(sched, taskSet)
-
-    // Offer a host with no CPUs
-    assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
-
-    // Offer a host with process-local as the constraint; this should work because the TaskSet
-    // above won't have any locality preferences
-    val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
-    assert(taskOption.isDefined)
-    val task = taskOption.get
-    assert(task.executorId === "exec1")
-    assert(sched.startedTasks.contains(0))
-
-    // Re-offer the host -- now we should get no more tasks
-    assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
-
-    // Tell it the task has finished
-    manager.handleSuccessfulTask(0, createTaskResult(0))
-    assert(sched.endedTasks(0) === Success)
-    assert(sched.finishedManagers.contains(manager))
-  }
-
-  test("multiple offers with no preferences") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
-    val taskSet = createTaskSet(3)
-    val manager = new ClusterTaskSetManager(sched, taskSet)
-
-    // First three offers should all find tasks
-    for (i <- 0 until 3) {
-      val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
-      assert(taskOption.isDefined)
-      val task = taskOption.get
-      assert(task.executorId === "exec1")
-    }
-    assert(sched.startedTasks.toSet === Set(0, 1, 2))
-
-    // Re-offer the host -- now we should get no more tasks
-    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
-    // Finish the first two tasks
-    manager.handleSuccessfulTask(0, createTaskResult(0))
-    manager.handleSuccessfulTask(1, createTaskResult(1))
-    assert(sched.endedTasks(0) === Success)
-    assert(sched.endedTasks(1) === Success)
-    assert(!sched.finishedManagers.contains(manager))
-
-    // Finish the last task
-    manager.handleSuccessfulTask(2, createTaskResult(2))
-    assert(sched.endedTasks(2) === Success)
-    assert(sched.finishedManagers.contains(manager))
-  }
-
-  test("basic delay scheduling") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
-    val taskSet = createTaskSet(4,
-      Seq(TaskLocation("host1", "exec1")),
-      Seq(TaskLocation("host2", "exec2")),
-      Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
-      Seq()   // Last task has no locality prefs
-    )
-    val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
-    // First offer host1, exec1: first task should be chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
-    // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
-
-    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
-    clock.advance(LOCALITY_WAIT)
-
-    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
-    // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
-    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
-
-    // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
-
-    // Offer host1, exec1 again, at ANY level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
-    clock.advance(LOCALITY_WAIT)
-
-    // Offer host1, exec1 again, at ANY level: task 1 should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
-    // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-  }
-
-  test("delay scheduling with fallback") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc,
-      ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
-    val taskSet = createTaskSet(5,
-      Seq(TaskLocation("host1")),
-      Seq(TaskLocation("host2")),
-      Seq(TaskLocation("host2")),
-      Seq(TaskLocation("host3")),
-      Seq(TaskLocation("host2"))
-    )
-    val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
-    // First offer host1: first task should be chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
-    // Offer host1 again: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
-    clock.advance(LOCALITY_WAIT)
-
-    // Offer host1 again: second task (on host2) should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
-    // Offer host1 again: third task (on host2) should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
-    // Offer host2: fifth task (also on host2) should get chosen
-    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
-
-    // Now that we've launched a local task, we should no longer launch the task for host3
-    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
-
-    clock.advance(LOCALITY_WAIT)
-
-    // After another delay, we can go ahead and launch that task non-locally
-    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
-  }
-
-  test("delay scheduling with failed hosts") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
-    val taskSet = createTaskSet(3,
-      Seq(TaskLocation("host1")),
-      Seq(TaskLocation("host2")),
-      Seq(TaskLocation("host3"))
-    )
-    val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
-    // First offer host1: first task should be chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
-    // Offer host1 again: third task should be chosen immediately because host3 is not up
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
-    // After this, nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
-    // Now mark host2 as dead
-    sched.removeExecutor("exec2")
-    manager.executorLost("exec2", "host2")
-
-    // Task 1 should immediately be launched on host1 because its original host is gone
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
-    // Now that all tasks have launched, nothing new should be launched anywhere else
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
-  }
-
-  test("task result lost") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
-    val taskSet = createTaskSet(1)
-    val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
-    // Tell it the task has finished but the result was lost.
-    manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
-    assert(sched.endedTasks(0) === TaskResultLost)
-
-    // Re-offer the host -- now we should get task 0 again.
-    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-  }
-
-  test("repeated failures lead to task set abortion") {
-    sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
-    val taskSet = createTaskSet(1)
-    val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
-    // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
-    // after the last failure.
-    (0 until manager.MAX_TASK_FAILURES).foreach { index =>
-      val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
-      assert(offerResult != None,
-        "Expect resource offer on iteration %s to return a task".format(index))
-      assert(offerResult.get.index === 0)
-      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
-      if (index < manager.MAX_TASK_FAILURES) {
-        assert(!sched.taskSetsFailed.contains(taskSet.id))
-      } else {
-        assert(sched.taskSetsFailed.contains(taskSet.id))
-      }
-    }
-  }
-
-
-  /**
-   * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
-   * locations for each task (given as varargs) if this sequence is not empty.
-   */
-  def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
-    if (prefLocs.size != 0 && prefLocs.size != numTasks) {
-      throw new IllegalArgumentException("Wrong number of task locations")
-    }
-    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
-      new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
-    }
-    new TaskSet(tasks, 0, 0, 0, null)
-  }
-
-  def createTaskResult(id: Int): DirectTaskResult[Int] = {
-    new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
deleted file mode 100644
index 0f01515..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.TaskContext
-import org.apache.spark.scheduler.{TaskLocation, Task}
-
-class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
-  override def runTask(context: TaskContext): Int = 0
-
-  override def preferredLocations: Seq[TaskLocation] = prefLocs
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
deleted file mode 100644
index 77d3038..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
-import org.apache.spark.storage.TaskResultBlockId
-
-/**
- * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
- *
- * Used to test the case where a BlockManager evicts the task result (or dies) before the
- * TaskResult is retrieved.
- */
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
-  extends TaskResultGetter(sparkEnv, scheduler) {
-  var removedResult = false
-
-  override def enqueueSuccessfulTask(
-    taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
-    if (!removedResult) {
-      // Only remove the result once, since we'd like to test the case where the task eventually
-      // succeeds.
-      serializer.get().deserialize[TaskResult[_]](serializedData) match {
-        case IndirectTaskResult(blockId) =>
-          sparkEnv.blockManager.master.removeBlock(blockId)
-        case directResult: DirectTaskResult[_] =>
-          taskSetManager.abort("Internal error: expect only indirect results") 
-      }
-      serializedData.rewind()
-      removedResult = true
-    }
-    super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
-  } 
-}
-
-/**
- * Tests related to handling task results (both direct and indirect).
- */
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
-  with LocalSparkContext {
-
-  override def beforeAll {
-    // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
-    // as we can make it) so the tests don't take too long.
-    System.setProperty("spark.akka.frameSize", "1")
-  }
-
-  before {
-    sc = new SparkContext("local", "test")
-  }
-
-  override def afterAll {
-    System.clearProperty("spark.akka.frameSize")
-  }
-
-  test("handling results smaller than Akka frame size") {
-    val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
-    assert(result === 2)
-  }
-
-  test("handling results larger than Akka frame size") { 
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
-    assert(result === 1.to(akkaFrameSize).toArray)
-
-    val RESULT_BLOCK_ID = TaskResultBlockId(0)
-    assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
-      "Expect result to be removed from the block manager.")
-  }
-
-  test("task retried if result missing from block manager") {
-    // If this test hangs, it's probably because no resource offers were made after the task
-    // failed.
-    val scheduler: ClusterScheduler = sc.taskScheduler match {
-      case clusterScheduler: ClusterScheduler =>
-        clusterScheduler
-      case _ =>
-        assert(false, "Expect local cluster to use ClusterScheduler")
-        throw new ClassCastException
-    }
-    scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
-    assert(result === 1.to(akkaFrameSize).toArray)
-
-    // Make sure two tasks were run (one failed one, and a second retried one).
-    assert(scheduler.nextTaskId.get() === 2)
-  }
-}
-