You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/25 01:35:34 UTC
[03/20] git commit: Fixed most issues with unit tests
Fixed most issues with unit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a124658e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a124658e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a124658e
Branch: refs/heads/master
Commit: a124658e53a5abeda00a2582385a294c8e452d21
Parents: 5e91495
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Oct 30 19:29:38 2013 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Oct 30 19:29:38 2013 -0700
----------------------------------------------------------------------
.../spark/scheduler/DAGSchedulerSuite.scala | 94 +++---
.../org/apache/spark/scheduler/FakeTask.scala | 26 ++
.../spark/scheduler/TaskResultGetterSuite.scala | 111 +++++++
.../spark/scheduler/TaskSchedulerSuite.scala | 265 ++++++++++++++++
.../spark/scheduler/TaskSetManagerSuite.scala | 317 ++++++++++++++++++
.../cluster/ClusterSchedulerSuite.scala | 267 ----------------
.../cluster/ClusterTaskSetManagerSuite.scala | 318 -------------------
.../spark/scheduler/cluster/FakeTask.scala | 27 --
.../cluster/TaskResultGetterSuite.scala | 112 -------
9 files changed, 767 insertions(+), 770 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 00f2fdd..394a1bb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -34,6 +34,24 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
/**
+ * TaskScheduler that records the task sets that the DAGScheduler requested executed.
+ */
+class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) {
+ /** Set of TaskSets the DAGScheduler has requested executed. */
+ val taskSets = scala.collection.mutable.Buffer[TaskSet]()
+ override def start() = {}
+ override def stop() = {}
+ override def submitTasks(taskSet: TaskSet) = {
+ // normally done by TaskSetManager
+ taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
+ taskSets += taskSet
+ }
+ override def cancelTasks(stageId: Int) {}
+ override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+ override def defaultParallelism() = 2
+}
+
+/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
* to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
@@ -46,24 +64,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
* and capturing the resulting TaskSets from the mock TaskScheduler.
*/
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
- /** Set of TaskSets the DAGScheduler has requested executed. */
- val taskSets = scala.collection.mutable.Buffer[TaskSet]()
- val taskScheduler = new TaskScheduler() {
- override def rootPool: Pool = null
- override def schedulingMode: SchedulingMode = SchedulingMode.NONE
- override def start() = {}
- override def stop() = {}
- override def submitTasks(taskSet: TaskSet) = {
- // normally done by TaskSetManager
- taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
- taskSets += taskSet
- }
- override def cancelTasks(stageId: Int) {}
- override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
- override def defaultParallelism() = 2
- }
-
+ var taskScheduler: TaskSetRecordingTaskScheduler = null
var mapOutputTracker: MapOutputTrackerMaster = null
var scheduler: DAGScheduler = null
@@ -96,7 +97,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
- taskSets.clear()
+ taskScheduler = new TaskSetRecordingTaskScheduler(sc)
+ taskScheduler.taskSets.clear()
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTrackerMaster()
@@ -204,7 +206,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
test("run trivial job") {
val rdd = makeRdd(1, Nil)
submit(rdd, Array(0))
- complete(taskSets(0), List((Success, 42)))
+ complete(taskScheduler.taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -225,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
- complete(taskSets(0), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -235,7 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations(baseRdd.id -> 0) =
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
submit(finalRdd, Array(0))
- val taskSet = taskSets(0)
+ val taskSet = taskScheduler.taskSets(0)
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
@@ -243,7 +245,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
test("trivial job failure") {
submit(makeRdd(1, Nil), Array(0))
- failed(taskSets(0), "some failure")
+ failed(taskScheduler.taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted: some failure")
}
@@ -253,12 +255,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(1, List(shuffleDep))
submit(reduceRdd, Array(0))
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
- complete(taskSets(1), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -268,11 +270,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// the 2nd ResultTask failed
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
// this will get called
@@ -280,10 +282,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskScheduler.taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
- complete(taskSets(3), Seq((Success, 43)))
+ complete(taskScheduler.taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
}
@@ -299,7 +301,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
- val taskSet = taskSets(0)
+ val taskSet = taskScheduler.taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
// should work because it's a non-failed host
@@ -311,7 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
- complete(taskSets(1), Seq((Success, 42), (Success, 43)))
+ complete(taskScheduler.taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
}
@@ -326,14 +328,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(ExecutorLost("exec-hostA"))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// have hostC complete the resubmitted task
- complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
+ complete(taskScheduler.taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
- complete(taskSets(2), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -345,23 +347,23 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val finalRdd = makeRdd(1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
// have the first stage complete normally
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// have the second stage complete normally
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
- complete(taskSets(2), Seq(
+ complete(taskScheduler.taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
scheduler.resubmitFailedStages()
- complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
- complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
- complete(taskSets(5), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
+ complete(taskScheduler.taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskScheduler.taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -375,24 +377,24 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
// complete stage 2
- complete(taskSets(0), Seq(
+ complete(taskScheduler.taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// complete stage 1
- complete(taskSets(1), Seq(
+ complete(taskScheduler.taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
- complete(taskSets(2), Seq(
+ complete(taskScheduler.taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
scheduler.resubmitFailedStages()
- assertLocations(taskSets(3), Seq(Seq("hostD")))
+ assertLocations(taskScheduler.taskSets(3), Seq(Seq("hostD")))
// allow hostD to recover
- complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
- complete(taskSets(4), Seq((Success, 42)))
+ complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
+ complete(taskScheduler.taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
new file mode 100644
index 0000000..0b90c4e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.TaskContext
+
+class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
+ override def runTask(context: TaskContext): Int = 0
+
+ override def preferredLocations: Seq[TaskLocation] = prefLocs
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
new file mode 100644
index 0000000..30e6bc5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.storage.TaskResultBlockId
+
+/**
+ * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
+ *
+ * Used to test the case where a BlockManager evicts the task result (or dies) before the
+ * TaskResult is retrieved.
+ */
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler)
+ extends TaskResultGetter(sparkEnv, scheduler) {
+ var removedResult = false
+
+ override def enqueueSuccessfulTask(
+ taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ if (!removedResult) {
+ // Only remove the result once, since we'd like to test the case where the task eventually
+ // succeeds.
+ serializer.get().deserialize[TaskResult[_]](serializedData) match {
+ case IndirectTaskResult(blockId) =>
+ sparkEnv.blockManager.master.removeBlock(blockId)
+ case directResult: DirectTaskResult[_] =>
+ taskSetManager.abort("Internal error: expect only indirect results")
+ }
+ serializedData.rewind()
+ removedResult = true
+ }
+ super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
+ }
+}
+
+/**
+ * Tests related to handling task results (both direct and indirect).
+ */
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
+ with LocalSparkContext {
+
+ override def beforeAll {
+ // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // as we can make it) so the tests don't take too long.
+ System.setProperty("spark.akka.frameSize", "1")
+ }
+
+ before {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll {
+ System.clearProperty("spark.akka.frameSize")
+ }
+
+ test("handling results smaller than Akka frame size") {
+ val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+ assert(result === 2)
+ }
+
+ test("handling results larger than Akka frame size") {
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(akkaFrameSize).toArray)
+
+ val RESULT_BLOCK_ID = TaskResultBlockId(0)
+ assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
+ "Expect result to be removed from the block manager.")
+ }
+
+ test("task retried if result missing from block manager") {
+ // If this test hangs, it's probably because no resource offers were made after the task
+ // failed.
+ val scheduler: TaskScheduler = sc.taskScheduler match {
+ case clusterScheduler: TaskScheduler =>
+ clusterScheduler
+ case _ =>
+ assert(false, "Expect local cluster to use TaskScheduler")
+ throw new ClassCastException
+ }
+ scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(akkaFrameSize).toArray)
+
+ // Make sure two tasks were run (one failed one, and a second retried one).
+ assert(scheduler.nextTaskId.get() === 2)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
new file mode 100644
index 0000000..bfbffdf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import scala.collection.mutable.ArrayBuffer
+
+import java.util.Properties
+
+class FakeTaskSetManager(
+ initPriority: Int,
+ initStageId: Int,
+ initNumTasks: Int,
+ taskScheduler: TaskScheduler,
+ taskSet: TaskSet)
+ extends TaskSetManager(taskScheduler, taskSet) {
+
+ parent = null
+ weight = 1
+ minShare = 2
+ runningTasks = 0
+ priority = initPriority
+ stageId = initStageId
+ name = "TaskSet_"+stageId
+ override val numTasks = initNumTasks
+ tasksSuccessful = 0
+
+ def increaseRunningTasks(taskNum: Int) {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
+ }
+
+ def decreaseRunningTasks(taskNum: Int) {
+ runningTasks -= taskNum
+ if (parent != null) {
+ parent.decreaseRunningTasks(taskNum)
+ }
+ }
+
+ override def addSchedulable(schedulable: Schedulable) {
+ }
+
+ override def removeSchedulable(schedulable: Schedulable) {
+ }
+
+ override def getSchedulableByName(name: String): Schedulable = {
+ return null
+ }
+
+ override def executorLost(executorId: String, host: String): Unit = {
+ }
+
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription] =
+ {
+ if (tasksSuccessful + runningTasks < numTasks) {
+ increaseRunningTasks(1)
+ return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+ }
+ return None
+ }
+
+ override def checkSpeculatableTasks(): Boolean = {
+ return true
+ }
+
+ def taskFinished() {
+ decreaseRunningTasks(1)
+ tasksSuccessful +=1
+ if (tasksSuccessful == numTasks) {
+ parent.removeSchedulable(this)
+ }
+ }
+
+ def abort() {
+ decreaseRunningTasks(runningTasks)
+ parent.removeSchedulable(this)
+ }
+}
+
+class TaskSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
+
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
+ }
+
+ def resourceOffer(rootPool: Pool): Int = {
+ val taskSetQueue = rootPool.getSortedTaskSetQueue()
+ /* Just for Test*/
+ for (manager <- taskSetQueue) {
+ logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
+ manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+ }
+ for (taskSet <- taskSetQueue) {
+ taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
+ case Some(task) =>
+ return taskSet.stageId
+ case None => {}
+ }
+ }
+ -1
+ }
+
+ def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
+ assert(resourceOffer(rootPool) === expectedTaskSetId)
+ }
+
+ test("FIFO Scheduler Test") {
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new FakeTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+ val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+ schedulableBuilder.buildPools()
+
+ val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
+ val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
+ val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+ schedulableBuilder.addTaskSetManager(taskSetManager1, null)
+ schedulableBuilder.addTaskSetManager(taskSetManager2, null)
+
+ checkTaskSetId(rootPool, 0)
+ resourceOffer(rootPool)
+ checkTaskSetId(rootPool, 1)
+ resourceOffer(rootPool)
+ taskSetManager1.abort()
+ checkTaskSetId(rootPool, 2)
+ }
+
+ test("Fair Scheduler Test") {
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new FakeTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+ System.setProperty("spark.scheduler.allocation.file", xmlPath)
+ val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+ schedulableBuilder.buildPools()
+
+ assert(rootPool.getSchedulableByName("default") != null)
+ assert(rootPool.getSchedulableByName("1") != null)
+ assert(rootPool.getSchedulableByName("2") != null)
+ assert(rootPool.getSchedulableByName("3") != null)
+ assert(rootPool.getSchedulableByName("1").minShare === 2)
+ assert(rootPool.getSchedulableByName("1").weight === 1)
+ assert(rootPool.getSchedulableByName("2").minShare === 3)
+ assert(rootPool.getSchedulableByName("2").weight === 1)
+ assert(rootPool.getSchedulableByName("3").minShare === 0)
+ assert(rootPool.getSchedulableByName("3").weight === 1)
+
+ val properties1 = new Properties()
+ properties1.setProperty("spark.scheduler.pool","1")
+ val properties2 = new Properties()
+ properties2.setProperty("spark.scheduler.pool","2")
+
+ val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
+ val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
+ val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
+ schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
+ schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
+
+ val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
+ val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
+ schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
+
+ checkTaskSetId(rootPool, 0)
+ checkTaskSetId(rootPool, 3)
+ checkTaskSetId(rootPool, 3)
+ checkTaskSetId(rootPool, 1)
+ checkTaskSetId(rootPool, 4)
+ checkTaskSetId(rootPool, 2)
+ checkTaskSetId(rootPool, 2)
+ checkTaskSetId(rootPool, 4)
+
+ taskSetManager12.taskFinished()
+ assert(rootPool.getSchedulableByName("1").runningTasks === 3)
+ taskSetManager24.abort()
+ assert(rootPool.getSchedulableByName("2").runningTasks === 2)
+ }
+
+ test("Nested Pool Test") {
+ sc = new SparkContext("local", "TaskSchedulerSuite")
+ val taskScheduler = new TaskScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new FakeTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
+ val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+ rootPool.addSchedulable(pool0)
+ rootPool.addSchedulable(pool1)
+
+ val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
+ val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+ pool0.addSchedulable(pool00)
+ pool0.addSchedulable(pool01)
+
+ val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
+ val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+ pool1.addSchedulable(pool10)
+ pool1.addSchedulable(pool11)
+
+ val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
+ val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
+ pool00.addSchedulable(taskSetManager000)
+ pool00.addSchedulable(taskSetManager001)
+
+ val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
+ val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
+ pool01.addSchedulable(taskSetManager010)
+ pool01.addSchedulable(taskSetManager011)
+
+ val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
+ val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
+ pool10.addSchedulable(taskSetManager100)
+ pool10.addSchedulable(taskSetManager101)
+
+ val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
+ val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
+ pool11.addSchedulable(taskSetManager110)
+ pool11.addSchedulable(taskSetManager111)
+
+ checkTaskSetId(rootPool, 0)
+ checkTaskSetId(rootPool, 4)
+ checkTaskSetId(rootPool, 6)
+ checkTaskSetId(rootPool, 2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
new file mode 100644
index 0000000..fe3ea7b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import java.nio.ByteBuffer
+import org.apache.spark.util.{Utils, FakeClock}
+
+class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ taskScheduler.startedTasks += taskInfo.index
+ }
+
+ override def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: mutable.Map[Long, Any],
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics) {
+ taskScheduler.endedTasks(taskInfo.index) = reason
+ }
+
+ override def executorGained(execId: String, host: String) {}
+
+ override def executorLost(execId: String) {}
+
+ override def taskSetFailed(taskSet: TaskSet, reason: String) {
+ taskScheduler.taskSetsFailed += taskSet.id
+ }
+}
+
+/**
+ * A mock TaskScheduler implementation that just remembers information about tasks started and
+ * feedback received from the TaskSetManagers. Note that it's important to initialize this with
+ * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
+ * to work, and these are required for locality in TaskSetManager.
+ */
+class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+ extends TaskScheduler(sc)
+{
+ val startedTasks = new ArrayBuffer[Long]
+ val endedTasks = new mutable.HashMap[Long, TaskEndReason]
+ val finishedManagers = new ArrayBuffer[TaskSetManager]
+ val taskSetsFailed = new ArrayBuffer[String]
+
+ val executors = new mutable.HashMap[String, String] ++ liveExecutors
+
+ dagScheduler = new FakeDAGScheduler(this)
+
+ def removeExecutor(execId: String): Unit = executors -= execId
+
+ override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
+
+ override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
+
+ override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+}
+
+class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+ import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+
+ val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+
+ test("TaskSet with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val manager = new TaskSetManager(sched, taskSet)
+
+ // Offer a host with no CPUs
+ assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
+
+ // Offer a host with process-local as the constraint; this should work because the TaskSet
+ // above won't have any locality preferences
+ val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ assert(sched.startedTasks.contains(0))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+
+ // Tell it the task has finished
+ manager.handleSuccessfulTask(0, createTaskResult(0))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("multiple offers with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(3)
+ val manager = new TaskSetManager(sched, taskSet)
+
+ // First three offers should all find tasks
+ for (i <- 0 until 3) {
+ val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ }
+ assert(sched.startedTasks.toSet === Set(0, 1, 2))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Finish the first two tasks
+ manager.handleSuccessfulTask(0, createTaskResult(0))
+ manager.handleSuccessfulTask(1, createTaskResult(1))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.endedTasks(1) === Success)
+ assert(!sched.finishedManagers.contains(manager))
+
+ // Finish the last task
+ manager.handleSuccessfulTask(2, createTaskResult(2))
+ assert(sched.endedTasks(2) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("basic delay scheduling") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(4,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host2", "exec2")),
+ Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
+ Seq() // Last task has no locality prefs
+ )
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, clock)
+
+ // First offer host1, exec1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+
+ // Offer host1, exec1 again, at ANY level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at ANY level: task 1 should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ }
+
+ test("delay scheduling with fallback") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc,
+ ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+ val taskSet = createTaskSet(5,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3")),
+ Seq(TaskLocation("host2"))
+ )
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1 again: second task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1 again: third task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // Offer host2: fifth task (also on host2) should get chosen
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+
+ // Now that we've launched a local task, we should no longer launch the task for host3
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // After another delay, we can go ahead and launch that task non-locally
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+ }
+
+ test("delay scheduling with failed hosts") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(3,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3"))
+ )
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: third task should be chosen immediately because host3 is not up
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // After this, nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ // Now mark host2 as dead
+ sched.removeExecutor("exec2")
+ manager.executorLost("exec2", "host2")
+
+ // Task 1 should immediately be launched on host1 because its original host is gone
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Now that all tasks have launched, nothing new should be launched anywhere else
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+ }
+
+ test("task result lost") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, clock)
+
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Tell it the task has finished but the result was lost.
+ manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
+ assert(sched.endedTasks(0) === TaskResultLost)
+
+ // Re-offer the host -- now we should get task 0 again.
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ }
+
+ test("repeated failures lead to task set abortion") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, clock)
+
+ // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
+ // after the last failure.
+ (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+ val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
+ assert(offerResult != None,
+ "Expect resource offer on iteration %s to return a task".format(index))
+ assert(offerResult.get.index === 0)
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
+ if (index < manager.MAX_TASK_FAILURES) {
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+ } else {
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+ }
+ }
+
+
+ /**
+ * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
+ * locations for each task (given as varargs) if this sequence is not empty.
+ */
+ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+ if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+ throw new IllegalArgumentException("Wrong number of task locations")
+ }
+ val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+ new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ }
+ new TaskSet(tasks, 0, 0, 0, null)
+ }
+
+ def createTaskResult(id: Int): DirectTaskResult[Int] = {
+ new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
deleted file mode 100644
index 95d3553..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import scala.collection.mutable.ArrayBuffer
-
-import java.util.Properties
-
-class FakeTaskSetManager(
- initPriority: Int,
- initStageId: Int,
- initNumTasks: Int,
- clusterScheduler: ClusterScheduler,
- taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler, taskSet) {
-
- parent = null
- weight = 1
- minShare = 2
- runningTasks = 0
- priority = initPriority
- stageId = initStageId
- name = "TaskSet_"+stageId
- override val numTasks = initNumTasks
- tasksSuccessful = 0
-
- def increaseRunningTasks(taskNum: Int) {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
- }
- }
-
- def decreaseRunningTasks(taskNum: Int) {
- runningTasks -= taskNum
- if (parent != null) {
- parent.decreaseRunningTasks(taskNum)
- }
- }
-
- override def addSchedulable(schedulable: Schedulable) {
- }
-
- override def removeSchedulable(schedulable: Schedulable) {
- }
-
- override def getSchedulableByName(name: String): Schedulable = {
- return null
- }
-
- override def executorLost(executorId: String, host: String): Unit = {
- }
-
- override def resourceOffer(
- execId: String,
- host: String,
- availableCpus: Int,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription] =
- {
- if (tasksSuccessful + runningTasks < numTasks) {
- increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
- }
- return None
- }
-
- override def checkSpeculatableTasks(): Boolean = {
- return true
- }
-
- def taskFinished() {
- decreaseRunningTasks(1)
- tasksSuccessful +=1
- if (tasksSuccessful == numTasks) {
- parent.removeSchedulable(this)
- }
- }
-
- def abort() {
- decreaseRunningTasks(runningTasks)
- parent.removeSchedulable(this)
- }
-}
-
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
-
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
- new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
- }
-
- def resourceOffer(rootPool: Pool): Int = {
- val taskSetQueue = rootPool.getSortedTaskSetQueue()
- /* Just for Test*/
- for (manager <- taskSetQueue) {
- logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
- manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
- }
- for (taskSet <- taskSetQueue) {
- taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
- case Some(task) =>
- return taskSet.stageId
- case None => {}
- }
- }
- -1
- }
-
- def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
- assert(resourceOffer(rootPool) === expectedTaskSetId)
- }
-
- test("FIFO Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
- val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
- schedulableBuilder.buildPools()
-
- val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
- val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
- val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager0, null)
- schedulableBuilder.addTaskSetManager(taskSetManager1, null)
- schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
- checkTaskSetId(rootPool, 0)
- resourceOffer(rootPool)
- checkTaskSetId(rootPool, 1)
- resourceOffer(rootPool)
- taskSetManager1.abort()
- checkTaskSetId(rootPool, 2)
- }
-
- test("Fair Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.scheduler.allocation.file", xmlPath)
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool)
- schedulableBuilder.buildPools()
-
- assert(rootPool.getSchedulableByName("default") != null)
- assert(rootPool.getSchedulableByName("1") != null)
- assert(rootPool.getSchedulableByName("2") != null)
- assert(rootPool.getSchedulableByName("3") != null)
- assert(rootPool.getSchedulableByName("1").minShare === 2)
- assert(rootPool.getSchedulableByName("1").weight === 1)
- assert(rootPool.getSchedulableByName("2").minShare === 3)
- assert(rootPool.getSchedulableByName("2").weight === 1)
- assert(rootPool.getSchedulableByName("3").minShare === 0)
- assert(rootPool.getSchedulableByName("3").weight === 1)
-
- val properties1 = new Properties()
- properties1.setProperty("spark.scheduler.pool","1")
- val properties2 = new Properties()
- properties2.setProperty("spark.scheduler.pool","2")
-
- val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
- val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
- val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
- schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
- schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
- val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
- val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
- schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
-
- checkTaskSetId(rootPool, 0)
- checkTaskSetId(rootPool, 3)
- checkTaskSetId(rootPool, 3)
- checkTaskSetId(rootPool, 1)
- checkTaskSetId(rootPool, 4)
- checkTaskSetId(rootPool, 2)
- checkTaskSetId(rootPool, 2)
- checkTaskSetId(rootPool, 4)
-
- taskSetManager12.taskFinished()
- assert(rootPool.getSchedulableByName("1").runningTasks === 3)
- taskSetManager24.abort()
- assert(rootPool.getSchedulableByName("2").runningTasks === 2)
- }
-
- test("Nested Pool Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
- val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
- rootPool.addSchedulable(pool0)
- rootPool.addSchedulable(pool1)
-
- val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
- val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
- pool0.addSchedulable(pool00)
- pool0.addSchedulable(pool01)
-
- val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
- val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
- pool1.addSchedulable(pool10)
- pool1.addSchedulable(pool11)
-
- val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
- val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
- pool00.addSchedulable(taskSetManager000)
- pool00.addSchedulable(taskSetManager001)
-
- val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
- val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
- pool01.addSchedulable(taskSetManager010)
- pool01.addSchedulable(taskSetManager011)
-
- val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
- val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
- pool10.addSchedulable(taskSetManager100)
- pool10.addSchedulable(taskSetManager101)
-
- val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
- val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
- pool11.addSchedulable(taskSetManager110)
- pool11.addSchedulable(taskSetManager111)
-
- checkTaskSetId(rootPool, 0)
- checkTaskSetId(rootPool, 4)
- checkTaskSetId(rootPool, 6)
- checkTaskSetId(rootPool, 2)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
deleted file mode 100644
index b97f2b1..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable
-
-import org.scalatest.FunSuite
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.executor.TaskMetrics
-import java.nio.ByteBuffer
-import org.apache.spark.util.{Utils, FakeClock}
-
-class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
- override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
- taskScheduler.startedTasks += taskInfo.index
- }
-
- override def taskEnded(
- task: Task[_],
- reason: TaskEndReason,
- result: Any,
- accumUpdates: mutable.Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) {
- taskScheduler.endedTasks(taskInfo.index) = reason
- }
-
- override def executorGained(execId: String, host: String) {}
-
- override def executorLost(execId: String) {}
-
- override def taskSetFailed(taskSet: TaskSet, reason: String) {
- taskScheduler.taskSetsFailed += taskSet.id
- }
-}
-
-/**
- * A mock ClusterScheduler implementation that just remembers information about tasks started and
- * feedback received from the TaskSetManagers. Note that it's important to initialize this with
- * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
- */
-class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
- extends ClusterScheduler(sc)
-{
- val startedTasks = new ArrayBuffer[Long]
- val endedTasks = new mutable.HashMap[Long, TaskEndReason]
- val finishedManagers = new ArrayBuffer[TaskSetManager]
- val taskSetsFailed = new ArrayBuffer[String]
-
- val executors = new mutable.HashMap[String, String] ++ liveExecutors
-
- dagScheduler = new FakeDAGScheduler(this)
-
- def removeExecutor(execId: String): Unit = executors -= execId
-
- override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
-
- override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
-
- override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
-}
-
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
- import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
-
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
-
- test("TaskSet with no preferences") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(1)
- val manager = new ClusterTaskSetManager(sched, taskSet)
-
- // Offer a host with no CPUs
- assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
-
- // Offer a host with process-local as the constraint; this should work because the TaskSet
- // above won't have any locality preferences
- val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
- assert(taskOption.isDefined)
- val task = taskOption.get
- assert(task.executorId === "exec1")
- assert(sched.startedTasks.contains(0))
-
- // Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
-
- // Tell it the task has finished
- manager.handleSuccessfulTask(0, createTaskResult(0))
- assert(sched.endedTasks(0) === Success)
- assert(sched.finishedManagers.contains(manager))
- }
-
- test("multiple offers with no preferences") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(3)
- val manager = new ClusterTaskSetManager(sched, taskSet)
-
- // First three offers should all find tasks
- for (i <- 0 until 3) {
- val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
- assert(taskOption.isDefined)
- val task = taskOption.get
- assert(task.executorId === "exec1")
- }
- assert(sched.startedTasks.toSet === Set(0, 1, 2))
-
- // Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- // Finish the first two tasks
- manager.handleSuccessfulTask(0, createTaskResult(0))
- manager.handleSuccessfulTask(1, createTaskResult(1))
- assert(sched.endedTasks(0) === Success)
- assert(sched.endedTasks(1) === Success)
- assert(!sched.finishedManagers.contains(manager))
-
- // Finish the last task
- manager.handleSuccessfulTask(2, createTaskResult(2))
- assert(sched.endedTasks(2) === Success)
- assert(sched.finishedManagers.contains(manager))
- }
-
- test("basic delay scheduling") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
- val taskSet = createTaskSet(4,
- Seq(TaskLocation("host1", "exec1")),
- Seq(TaskLocation("host2", "exec2")),
- Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
- Seq() // Last task has no locality prefs
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1, exec1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
-
- // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
-
- // Offer host1, exec1 again, at ANY level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1, exec1 again, at ANY level: task 1 should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
- }
-
- test("delay scheduling with fallback") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc,
- ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
- val taskSet = createTaskSet(5,
- Seq(TaskLocation("host1")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host3")),
- Seq(TaskLocation("host2"))
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1 again: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1 again: second task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Offer host1 again: third task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
- // Offer host2: fifth task (also on host2) should get chosen
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
-
- // Now that we've launched a local task, we should no longer launch the task for host3
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // After another delay, we can go ahead and launch that task non-locally
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
- }
-
- test("delay scheduling with failed hosts") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
- val taskSet = createTaskSet(3,
- Seq(TaskLocation("host1")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host3"))
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1 again: third task should be chosen immediately because host3 is not up
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
- // After this, nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- // Now mark host2 as dead
- sched.removeExecutor("exec2")
- manager.executorLost("exec2", "host2")
-
- // Task 1 should immediately be launched on host1 because its original host is gone
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Now that all tasks have launched, nothing new should be launched anywhere else
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
- }
-
- test("task result lost") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(1)
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Tell it the task has finished but the result was lost.
- manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
- assert(sched.endedTasks(0) === TaskResultLost)
-
- // Re-offer the host -- now we should get task 0 again.
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
- }
-
- test("repeated failures lead to task set abortion") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(1)
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
- // after the last failure.
- (0 until manager.MAX_TASK_FAILURES).foreach { index =>
- val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
- assert(offerResult != None,
- "Expect resource offer on iteration %s to return a task".format(index))
- assert(offerResult.get.index === 0)
- manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
- if (index < manager.MAX_TASK_FAILURES) {
- assert(!sched.taskSetsFailed.contains(taskSet.id))
- } else {
- assert(sched.taskSetsFailed.contains(taskSet.id))
- }
- }
- }
-
-
- /**
- * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
- * locations for each task (given as varargs) if this sequence is not empty.
- */
- def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
- if (prefLocs.size != 0 && prefLocs.size != numTasks) {
- throw new IllegalArgumentException("Wrong number of task locations")
- }
- val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
- }
- new TaskSet(tasks, 0, 0, 0, null)
- }
-
- def createTaskResult(id: Int): DirectTaskResult[Int] = {
- new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
deleted file mode 100644
index 0f01515..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.TaskContext
-import org.apache.spark.scheduler.{TaskLocation, Task}
-
-class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
- override def runTask(context: TaskContext): Int = 0
-
- override def preferredLocations: Seq[TaskLocation] = prefLocs
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a124658e/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
deleted file mode 100644
index 77d3038..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
-import org.apache.spark.storage.TaskResultBlockId
-
-/**
- * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
- *
- * Used to test the case where a BlockManager evicts the task result (or dies) before the
- * TaskResult is retrieved.
- */
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
- extends TaskResultGetter(sparkEnv, scheduler) {
- var removedResult = false
-
- override def enqueueSuccessfulTask(
- taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
- if (!removedResult) {
- // Only remove the result once, since we'd like to test the case where the task eventually
- // succeeds.
- serializer.get().deserialize[TaskResult[_]](serializedData) match {
- case IndirectTaskResult(blockId) =>
- sparkEnv.blockManager.master.removeBlock(blockId)
- case directResult: DirectTaskResult[_] =>
- taskSetManager.abort("Internal error: expect only indirect results")
- }
- serializedData.rewind()
- removedResult = true
- }
- super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
- }
-}
-
-/**
- * Tests related to handling task results (both direct and indirect).
- */
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
- with LocalSparkContext {
-
- override def beforeAll {
- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
- // as we can make it) so the tests don't take too long.
- System.setProperty("spark.akka.frameSize", "1")
- }
-
- before {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll {
- System.clearProperty("spark.akka.frameSize")
- }
-
- test("handling results smaller than Akka frame size") {
- val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
- assert(result === 2)
- }
-
- test("handling results larger than Akka frame size") {
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
- assert(result === 1.to(akkaFrameSize).toArray)
-
- val RESULT_BLOCK_ID = TaskResultBlockId(0)
- assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
- "Expect result to be removed from the block manager.")
- }
-
- test("task retried if result missing from block manager") {
- // If this test hangs, it's probably because no resource offers were made after the task
- // failed.
- val scheduler: ClusterScheduler = sc.taskScheduler match {
- case clusterScheduler: ClusterScheduler =>
- clusterScheduler
- case _ =>
- assert(false, "Expect local cluster to use ClusterScheduler")
- throw new ClassCastException
- }
- scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
- assert(result === 1.to(akkaFrameSize).toArray)
-
- // Make sure two tasks were run (one failed one, and a second retried one).
- assert(scheduler.nextTaskId.get() === 2)
- }
-}
-