You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:49 UTC
[05/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
deleted file mode 100644
index d1276d5..0000000
--- a/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import scala.collection.immutable.NumericRange
-
-import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-class ParallelCollectionSplitSuite extends FunSuite with Checkers {
- test("one element per slice") {
- val data = Array(1, 2, 3)
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices(0).mkString(",") === "1")
- assert(slices(1).mkString(",") === "2")
- assert(slices(2).mkString(",") === "3")
- }
-
- test("one slice") {
- val data = Array(1, 2, 3)
- val slices = ParallelCollectionRDD.slice(data, 1)
- assert(slices.size === 1)
- assert(slices(0).mkString(",") === "1,2,3")
- }
-
- test("equal slices") {
- val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices(0).mkString(",") === "1,2,3")
- assert(slices(1).mkString(",") === "4,5,6")
- assert(slices(2).mkString(",") === "7,8,9")
- }
-
- test("non-equal slices") {
- val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices(0).mkString(",") === "1,2,3")
- assert(slices(1).mkString(",") === "4,5,6")
- assert(slices(2).mkString(",") === "7,8,9,10")
- }
-
- test("splitting exclusive range") {
- val data = 0 until 100
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices(0).mkString(",") === (0 to 32).mkString(","))
- assert(slices(1).mkString(",") === (33 to 65).mkString(","))
- assert(slices(2).mkString(",") === (66 to 99).mkString(","))
- }
-
- test("splitting inclusive range") {
- val data = 0 to 100
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices(0).mkString(",") === (0 to 32).mkString(","))
- assert(slices(1).mkString(",") === (33 to 66).mkString(","))
- assert(slices(2).mkString(",") === (67 to 100).mkString(","))
- }
-
- test("empty data") {
- val data = new Array[Int](0)
- val slices = ParallelCollectionRDD.slice(data, 5)
- assert(slices.size === 5)
- for (slice <- slices) assert(slice.size === 0)
- }
-
- test("zero slices") {
- val data = Array(1, 2, 3)
- intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
- }
-
- test("negative number of slices") {
- val data = Array(1, 2, 3)
- intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
- }
-
- test("exclusive ranges sliced into ranges") {
- val data = 1 until 100
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
- assert(slices.forall(_.isInstanceOf[Range]))
- }
-
- test("inclusive ranges sliced into ranges") {
- val data = 1 to 100
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
- assert(slices.forall(_.isInstanceOf[Range]))
- }
-
- test("large ranges don't overflow") {
- val N = 100 * 1000 * 1000
- val data = 0 until N
- val slices = ParallelCollectionRDD.slice(data, 40)
- assert(slices.size === 40)
- for (i <- 0 until 40) {
- assert(slices(i).isInstanceOf[Range])
- val range = slices(i).asInstanceOf[Range]
- assert(range.start === i * (N / 40), "slice " + i + " start")
- assert(range.end === (i+1) * (N / 40), "slice " + i + " end")
- assert(range.step === 1, "slice " + i + " step")
- }
- }
-
- test("random array tests") {
- val gen = for {
- d <- arbitrary[List[Int]]
- n <- Gen.choose(1, 100)
- } yield (d, n)
- val prop = forAll(gen) {
- (tuple: (List[Int], Int)) =>
- val d = tuple._1
- val n = tuple._2
- val slices = ParallelCollectionRDD.slice(d, n)
- ("n slices" |: slices.size == n) &&
- ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
- }
- check(prop)
- }
-
- test("random exclusive range tests") {
- val gen = for {
- a <- Gen.choose(-100, 100)
- b <- Gen.choose(-100, 100)
- step <- Gen.choose(-5, 5) suchThat (_ != 0)
- n <- Gen.choose(1, 100)
- } yield (a until b by step, n)
- val prop = forAll(gen) {
- case (d: Range, n: Int) =>
- val slices = ParallelCollectionRDD.slice(d, n)
- ("n slices" |: slices.size == n) &&
- ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
- ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
- }
- check(prop)
- }
-
- test("random inclusive range tests") {
- val gen = for {
- a <- Gen.choose(-100, 100)
- b <- Gen.choose(-100, 100)
- step <- Gen.choose(-5, 5) suchThat (_ != 0)
- n <- Gen.choose(1, 100)
- } yield (a to b by step, n)
- val prop = forAll(gen) {
- case (d: Range, n: Int) =>
- val slices = ParallelCollectionRDD.slice(d, n)
- ("n slices" |: slices.size == n) &&
- ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
- ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
- }
- check(prop)
- }
-
- test("exclusive ranges of longs") {
- val data = 1L until 100L
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
- assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
- }
-
- test("inclusive ranges of longs") {
- val data = 1L to 100L
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
- assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
- }
-
- test("exclusive ranges of doubles") {
- val data = 1.0 until 100.0 by 1.0
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
- assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
- }
-
- test("inclusive ranges of doubles") {
- val data = 1.0 to 100.0 by 1.0
- val slices = ParallelCollectionRDD.slice(data, 3)
- assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
- assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
deleted file mode 100644
index 3b4a0d5..0000000
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import scala.collection.mutable.{Map, HashMap}
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import spark.LocalSparkContext
-import spark.MapOutputTracker
-import spark.RDD
-import spark.SparkContext
-import spark.Partition
-import spark.TaskContext
-import spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import spark.{FetchFailed, Success, TaskEndReason}
-import spark.storage.{BlockManagerId, BlockManagerMaster}
-
-import spark.scheduler.cluster.Pool
-import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
-/**
- * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
- * rather than spawning an event loop thread as happens in the real code. They use EasyMock
- * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
- * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead
- * host notifications are sent). In addition, tests may check for side effects on a non-mocked
- * MapOutputTracker instance.
- *
- * Tests primarily consist of running DAGScheduler#processEvent and
- * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet)
- * and capturing the resulting TaskSets from the mock TaskScheduler.
- */
-class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
- /** Set of TaskSets the DAGScheduler has requested executed. */
- val taskSets = scala.collection.mutable.Buffer[TaskSet]()
- val taskScheduler = new TaskScheduler() {
- override def rootPool: Pool = null
- override def schedulingMode: SchedulingMode = SchedulingMode.NONE
- override def start() = {}
- override def stop() = {}
- override def submitTasks(taskSet: TaskSet) = {
- // normally done by TaskSetManager
- taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
- taskSets += taskSet
- }
- override def setListener(listener: TaskSchedulerListener) = {}
- override def defaultParallelism() = 2
- }
-
- var mapOutputTracker: MapOutputTracker = null
- var scheduler: DAGScheduler = null
-
- /**
- * Set of cache locations to return from our mock BlockManagerMaster.
- * Keys are (rdd ID, partition ID). Anything not present will return an empty
- * list of cache locations silently.
- */
- val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
- // stub out BlockManagerMaster.getLocations to use our cacheLocations
- val blockManagerMaster = new BlockManagerMaster(null) {
- override def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- blockIds.map { name =>
- val pieces = name.split("_")
- if (pieces(0) == "rdd") {
- val key = pieces(1).toInt -> pieces(2).toInt
- cacheLocations.getOrElse(key, Seq())
- } else {
- Seq()
- }
- }.toSeq
- }
- override def removeExecutor(execId: String) {
- // don't need to propagate to the driver, which we don't have
- }
- }
-
- /** The list of results that DAGScheduler has collected. */
- val results = new HashMap[Int, Any]()
- var failure: Exception = _
- val listener = new JobListener() {
- override def taskSucceeded(index: Int, result: Any) = results.put(index, result)
- override def jobFailed(exception: Exception) = { failure = exception }
- }
-
- before {
- sc = new SparkContext("local", "DAGSchedulerSuite")
- taskSets.clear()
- cacheLocations.clear()
- results.clear()
- mapOutputTracker = new MapOutputTracker()
- scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) {
- override def runLocally(job: ActiveJob) {
- // don't bother with the thread while unit testing
- runLocallyWithinThread(job)
- }
- }
- }
-
- after {
- scheduler.stop()
- }
-
- /**
- * Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
- * This is a pair RDD type so it can always be used in ShuffleDependencies.
- */
- type MyRDD = RDD[(Int, Int)]
-
- /**
- * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
- * preferredLocations (if any) that are passed to them. They are deliberately not executable
- * so we can test that DAGScheduler does not try to execute RDDs locally.
- */
- private def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]],
- locations: Seq[Seq[String]] = Nil
- ): MyRDD = {
- val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new RuntimeException("should not be reached")
- override def getPartitions = (0 to maxPartition).map(i => new Partition {
- override def index = i
- }).toArray
- override def getPreferredLocations(split: Partition): Seq[String] =
- if (locations.isDefinedAt(split.index))
- locations(split.index)
- else
- Nil
- override def toString: String = "DAGSchedulerSuiteRDD " + id
- }
- }
-
- /**
- * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
- * the scheduler not to exit.
- *
- * After processing the event, submit waiting stages as is done on most iterations of the
- * DAGScheduler event loop.
- */
- private def runEvent(event: DAGSchedulerEvent) {
- assert(!scheduler.processEvent(event))
- scheduler.submitWaitingStages()
- }
-
- /**
- * When we submit dummy Jobs, this is the compute function we supply. Except in a local test
- * below, we do not expect this function to ever be executed; instead, we will return results
- * directly through CompletionEvents.
- */
- private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) =>
- it.next.asInstanceOf[Tuple2[_, _]]._1
-
- /** Send the given CompletionEvent messages for the tasks in the TaskSet. */
- private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) {
- assert(taskSet.tasks.size >= results.size)
- for ((result, i) <- results.zipWithIndex) {
- if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
- }
- }
- }
-
- /** Sends the rdd to the scheduler for scheduling. */
- private def submit(
- rdd: RDD[_],
- partitions: Array[Int],
- func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
- allowLocal: Boolean = false,
- listener: JobListener = listener) {
- runEvent(JobSubmitted(rdd, func, partitions, allowLocal, null, listener))
- }
-
- /** Sends TaskSetFailed to the scheduler. */
- private def failed(taskSet: TaskSet, message: String) {
- runEvent(TaskSetFailed(taskSet, message))
- }
-
- test("zero split job") {
- val rdd = makeRdd(0, Nil)
- var numResults = 0
- val fakeListener = new JobListener() {
- override def taskSucceeded(partition: Int, value: Any) = numResults += 1
- override def jobFailed(exception: Exception) = throw exception
- }
- submit(rdd, Array(), listener = fakeListener)
- assert(numResults === 0)
- }
-
- test("run trivial job") {
- val rdd = makeRdd(1, Nil)
- submit(rdd, Array(0))
- complete(taskSets(0), List((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("local job") {
- val rdd = new MyRDD(sc, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- Array(42 -> 0).iterator
- override def getPartitions = Array( new Partition { override def index = 0 } )
- override def getPreferredLocations(split: Partition) = Nil
- override def toString = "DAGSchedulerSuite Local RDD"
- }
- runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
- assert(results === Map(0 -> 42))
- }
-
- test("run trivial job w/ dependency") {
- val baseRdd = makeRdd(1, Nil)
- val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
- submit(finalRdd, Array(0))
- complete(taskSets(0), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("cache location preferences w/ dependency") {
- val baseRdd = makeRdd(1, Nil)
- val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
- cacheLocations(baseRdd.id -> 0) =
- Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
- submit(finalRdd, Array(0))
- val taskSet = taskSets(0)
- assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
- complete(taskSet, Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("trivial job failure") {
- submit(makeRdd(1, Nil), Array(0))
- failed(taskSets(0), "some failure")
- assert(failure.getMessage === "Job failed: some failure")
- }
-
- test("run trivial shuffle") {
- val shuffleMapRdd = makeRdd(2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(1, List(shuffleDep))
- submit(reduceRdd, Array(0))
- complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
- Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
- complete(taskSets(1), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("run trivial shuffle with fetch failure") {
- val shuffleMapRdd = makeRdd(2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(2, List(shuffleDep))
- submit(reduceRdd, Array(0, 1))
- complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
- // the 2nd ResultTask failed
- complete(taskSets(1), Seq(
- (Success, 42),
- (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
- // this will get called
- // blockManagerMaster.removeExecutor("exec-hostA")
- // ask the scheduler to try it again
- scheduler.resubmitFailedStages()
- // have the 2nd attempt pass
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
- // we can see both result blocks now
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
- complete(taskSets(3), Seq((Success, 43)))
- assert(results === Map(0 -> 42, 1 -> 43))
- }
-
- test("ignore late map task completions") {
- val shuffleMapRdd = makeRdd(2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(2, List(shuffleDep))
- submit(reduceRdd, Array(0, 1))
- // pretend we were told hostA went away
- val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
- val newEpoch = mapOutputTracker.getEpoch
- assert(newEpoch > oldEpoch)
- val noAccum = Map[Long, Any]()
- val taskSet = taskSets(0)
- // should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
- // should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
- // should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
- // should work because it's a new epoch
- taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
- Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
- complete(taskSets(1), Seq((Success, 42), (Success, 43)))
- assert(results === Map(0 -> 42, 1 -> 43))
- }
-
- test("run trivial shuffle with out-of-band failure and retry") {
- val shuffleMapRdd = makeRdd(2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(1, List(shuffleDep))
- submit(reduceRdd, Array(0))
- // blockManagerMaster.removeExecutor("exec-hostA")
- // pretend we were told hostA went away
- runEvent(ExecutorLost("exec-hostA"))
- // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
- // rather than marking it is as failed and waiting.
- complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
- // have hostC complete the resubmitted task
- complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
- Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
- complete(taskSets(2), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("recursive shuffle failures") {
- val shuffleOneRdd = makeRdd(2, Nil)
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = makeRdd(1, List(shuffleDepTwo))
- submit(finalRdd, Array(0))
- // have the first stage complete normally
- complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
- // have the second stage complete normally
- complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostC", 1))))
- // fail the third stage because hostA went down
- complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
- // TODO assert this:
- // blockManagerMaster.removeExecutor("exec-hostA")
- // have DAGScheduler try again
- scheduler.resubmitFailedStages()
- complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
- complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
- complete(taskSets(5), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("cached post-shuffle") {
- val shuffleOneRdd = makeRdd(2, Nil)
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = makeRdd(1, List(shuffleDepTwo))
- submit(finalRdd, Array(0))
- cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
- cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
- // complete stage 2
- complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
- // complete stage 1
- complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
- // pretend stage 0 failed because hostA went down
- complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
- // TODO assert this:
- // blockManagerMaster.removeExecutor("exec-hostA")
- // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
- scheduler.resubmitFailedStages()
- assertLocations(taskSets(3), Seq(Seq("hostD")))
- // allow hostD to recover
- complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
- complete(taskSets(4), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- /**
- * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
- * Note that this checks only the host and not the executor ID.
- */
- private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
- assert(hosts.size === taskSet.tasks.size)
- for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
- assert(taskLocs.map(_.host) === expectedLocs)
- }
- }
-
- private def makeMapStatus(host: String, reduces: Int): MapStatus =
- new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
-
- private def makeBlockManagerId(host: String): BlockManagerId =
- BlockManagerId("exec-" + host, host, 12345, 0)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
deleted file mode 100644
index bb9e715..0000000
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-import scala.collection.mutable
-import spark._
-import spark.SparkContext._
-
-
-class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
-
- test("inner method") {
- sc = new SparkContext("local", "joblogger")
- val joblogger = new JobLogger {
- def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
- def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
- def getRddNameTest(rdd: RDD[_]) = getRddName(rdd)
- def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
- }
- type MyRDD = RDD[(Int, Int)]
- def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]]
- ): MyRDD = {
- val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new RuntimeException("should not be reached")
- override def getPartitions = (0 to maxPartition).map(i => new Partition {
- override def index = i
- }).toArray
- }
- }
- val jobID = 5
- val parentRdd = makeRdd(4, Nil)
- val shuffleDep = new ShuffleDependency(parentRdd, null)
- val rootRdd = makeRdd(4, List(shuffleDep))
- val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
- val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
-
- joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
- joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
- parentRdd.setName("MyRDD")
- joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
- joblogger.createLogWriterTest(jobID)
- joblogger.getJobIDtoPrintWriter.size should be (1)
- joblogger.buildJobDepTest(jobID, rootStage)
- joblogger.getJobIDToStages.get(jobID).get.size should be (2)
- joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
- joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
- joblogger.closeLogWriterTest(jobID)
- joblogger.getStageIDToJobID.size should be (0)
- joblogger.getJobIDToStages.size should be (0)
- joblogger.getJobIDtoPrintWriter.size should be (0)
- }
-
- test("inner variables") {
- sc = new SparkContext("local[4]", "joblogger")
- val joblogger = new JobLogger {
- override protected def closeLogWriter(jobID: Int) =
- getJobIDtoPrintWriter.get(jobID).foreach { fileWriter =>
- fileWriter.close()
- }
- }
- sc.addSparkListener(joblogger)
- val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
- rdd.reduceByKey(_+_).collect()
-
- joblogger.getLogDir should be ("/tmp/spark")
- joblogger.getJobIDtoPrintWriter.size should be (1)
- joblogger.getStageIDToJobID.size should be (2)
- joblogger.getStageIDToJobID.get(0) should be (Some(0))
- joblogger.getStageIDToJobID.get(1) should be (Some(0))
- joblogger.getJobIDToStages.size should be (1)
- }
-
-
- test("interface functions") {
- sc = new SparkContext("local[4]", "joblogger")
- val joblogger = new JobLogger {
- var onTaskEndCount = 0
- var onJobEndCount = 0
- var onJobStartCount = 0
- var onStageCompletedCount = 0
- var onStageSubmittedCount = 0
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
- override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
- override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
- }
- sc.addSparkListener(joblogger)
- val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
- rdd.reduceByKey(_+_).collect()
-
- joblogger.onJobStartCount should be (1)
- joblogger.onJobEndCount should be (1)
- joblogger.onTaskEndCount should be (8)
- joblogger.onStageSubmittedCount should be (2)
- joblogger.onStageCompletedCount should be (2)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
deleted file mode 100644
index 392d67d..0000000
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import org.scalatest.FunSuite
-import spark.{SparkContext, LocalSparkContext}
-import scala.collection.mutable
-import org.scalatest.matchers.ShouldMatchers
-import spark.SparkContext._
-
-/**
- *
- */
-
-class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
-
- test("local metrics") {
- sc = new SparkContext("local[4]", "test")
- val listener = new SaveStageInfo
- sc.addSparkListener(listener)
- sc.addSparkListener(new StatsReportListener)
- //just to make sure some of the tasks take a noticeable amount of time
- val w = {i:Int =>
- if (i == 0)
- Thread.sleep(100)
- i
- }
-
- val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
- d.count
- listener.stageInfos.size should be (1)
-
- val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
-
- val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2")
-
- val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)}
- d4.setName("A Cogroup")
-
- d4.collectAsMap
-
- listener.stageInfos.size should be (4)
- listener.stageInfos.foreach {stageInfo =>
- //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
- checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
- if (stageInfo.stage.rdd.name == d4.name) {
- checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
- }
-
- stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
- taskMetrics.resultSize should be > (0l)
- if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
- taskMetrics.shuffleWriteMetrics should be ('defined)
- taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
- }
- if (stageInfo.stage.rdd.name == d4.name) {
- taskMetrics.shuffleReadMetrics should be ('defined)
- val sm = taskMetrics.shuffleReadMetrics.get
- sm.totalBlocksFetched should be > (0)
- sm.localBlocksFetched should be > (0)
- sm.remoteBlocksFetched should be (0)
- sm.remoteBytesRead should be (0l)
- sm.remoteFetchTime should be (0l)
- }
- }
- }
- }
-
- def checkNonZeroAvg(m: Traversable[Long], msg: String) {
- assert(m.sum / m.size.toDouble > 0.0, msg)
- }
-
- def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
- val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
- !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
- }
-
- class SaveStageInfo extends SparkListener {
- val stageInfos = mutable.Buffer[StageInfo]()
- override def onStageCompleted(stage: StageCompleted) {
- stageInfos += stage.stageInfo
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
deleted file mode 100644
index 95a6eee..0000000
--- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-import spark.TaskContext
-import spark.RDD
-import spark.SparkContext
-import spark.Partition
-import spark.LocalSparkContext
-
-class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
- test("Calls executeOnCompleteCallbacks after failure") {
- var completed = false
- sc = new SparkContext("local", "test")
- val rdd = new RDD[String](sc, List()) {
- override def getPartitions = Array[Partition](StubPartition(0))
- override def compute(split: Partition, context: TaskContext) = {
- context.addOnCompleteCallback(() => completed = true)
- sys.error("failed")
- }
- }
- val func = (c: TaskContext, i: Iterator[String]) => i.next
- val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
- intercept[RuntimeException] {
- task.run(0)
- }
- assert(completed === true)
- }
-
- case class StubPartition(val index: Int) extends Partition
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
deleted file mode 100644
index abfdabf..0000000
--- a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster._
-import scala.collection.mutable.ArrayBuffer
-
-import java.util.Properties
-
-class FakeTaskSetManager(
- initPriority: Int,
- initStageId: Int,
- initNumTasks: Int,
- clusterScheduler: ClusterScheduler,
- taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler, taskSet) {
-
- parent = null
- weight = 1
- minShare = 2
- runningTasks = 0
- priority = initPriority
- stageId = initStageId
- name = "TaskSet_"+stageId
- override val numTasks = initNumTasks
- tasksFinished = 0
-
- override def increaseRunningTasks(taskNum: Int) {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
- }
- }
-
- override def decreaseRunningTasks(taskNum: Int) {
- runningTasks -= taskNum
- if (parent != null) {
- parent.decreaseRunningTasks(taskNum)
- }
- }
-
- override def addSchedulable(schedulable: Schedulable) {
- }
-
- override def removeSchedulable(schedulable: Schedulable) {
- }
-
- override def getSchedulableByName(name: String): Schedulable = {
- return null
- }
-
- override def executorLost(executorId: String, host: String): Unit = {
- }
-
- override def resourceOffer(
- execId: String,
- host: String,
- availableCpus: Int,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription] =
- {
- if (tasksFinished + runningTasks < numTasks) {
- increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
- }
- return None
- }
-
- override def checkSpeculatableTasks(): Boolean = {
- return true
- }
-
- def taskFinished() {
- decreaseRunningTasks(1)
- tasksFinished +=1
- if (tasksFinished == numTasks) {
- parent.removeSchedulable(this)
- }
- }
-
- def abort() {
- decreaseRunningTasks(runningTasks)
- parent.removeSchedulable(this)
- }
-}
-
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
-
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
- new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
- }
-
- def resourceOffer(rootPool: Pool): Int = {
- val taskSetQueue = rootPool.getSortedTaskSetQueue()
- /* Just for Test*/
- for (manager <- taskSetQueue) {
- logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
- }
- for (taskSet <- taskSetQueue) {
- taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
- case Some(task) =>
- return taskSet.stageId
- case None => {}
- }
- }
- -1
- }
-
- def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
- assert(resourceOffer(rootPool) === expectedTaskSetId)
- }
-
- test("FIFO Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
- val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
- schedulableBuilder.buildPools()
-
- val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
- val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
- val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager0, null)
- schedulableBuilder.addTaskSetManager(taskSetManager1, null)
- schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
- checkTaskSetId(rootPool, 0)
- resourceOffer(rootPool)
- checkTaskSetId(rootPool, 1)
- resourceOffer(rootPool)
- taskSetManager1.abort()
- checkTaskSetId(rootPool, 2)
- }
-
- test("Fair Scheduler Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool)
- schedulableBuilder.buildPools()
-
- assert(rootPool.getSchedulableByName("default") != null)
- assert(rootPool.getSchedulableByName("1") != null)
- assert(rootPool.getSchedulableByName("2") != null)
- assert(rootPool.getSchedulableByName("3") != null)
- assert(rootPool.getSchedulableByName("1").minShare === 2)
- assert(rootPool.getSchedulableByName("1").weight === 1)
- assert(rootPool.getSchedulableByName("2").minShare === 3)
- assert(rootPool.getSchedulableByName("2").weight === 1)
- assert(rootPool.getSchedulableByName("3").minShare === 2)
- assert(rootPool.getSchedulableByName("3").weight === 1)
-
- val properties1 = new Properties()
- properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
- val properties2 = new Properties()
- properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
-
- val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
- val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
- val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
- schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
- schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
- val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
- val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
- schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
- schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
-
- checkTaskSetId(rootPool, 0)
- checkTaskSetId(rootPool, 3)
- checkTaskSetId(rootPool, 3)
- checkTaskSetId(rootPool, 1)
- checkTaskSetId(rootPool, 4)
- checkTaskSetId(rootPool, 2)
- checkTaskSetId(rootPool, 2)
- checkTaskSetId(rootPool, 4)
-
- taskSetManager12.taskFinished()
- assert(rootPool.getSchedulableByName("1").runningTasks === 3)
- taskSetManager24.abort()
- assert(rootPool.getSchedulableByName("2").runningTasks === 2)
- }
-
- test("Nested Pool Test") {
- sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new FakeTask(0)
- tasks += task
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
- val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
- rootPool.addSchedulable(pool0)
- rootPool.addSchedulable(pool1)
-
- val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
- val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
- pool0.addSchedulable(pool00)
- pool0.addSchedulable(pool01)
-
- val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
- val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
- pool1.addSchedulable(pool10)
- pool1.addSchedulable(pool11)
-
- val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
- val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
- pool00.addSchedulable(taskSetManager000)
- pool00.addSchedulable(taskSetManager001)
-
- val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
- val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
- pool01.addSchedulable(taskSetManager010)
- pool01.addSchedulable(taskSetManager011)
-
- val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
- val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
- pool10.addSchedulable(taskSetManager100)
- pool10.addSchedulable(taskSetManager101)
-
- val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
- val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
- pool11.addSchedulable(taskSetManager110)
- pool11.addSchedulable(taskSetManager111)
-
- checkTaskSetId(rootPool, 0)
- checkTaskSetId(rootPool, 4)
- checkTaskSetId(rootPool, 6)
- checkTaskSetId(rootPool, 2)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
deleted file mode 100644
index 5a0b949..0000000
--- a/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable
-
-import org.scalatest.FunSuite
-
-import spark._
-import spark.scheduler._
-import spark.executor.TaskMetrics
-import java.nio.ByteBuffer
-import spark.util.FakeClock
-
-/**
- * A mock ClusterScheduler implementation that just remembers information about tasks started and
- * feedback received from the TaskSetManagers. Note that it's important to initialize this with
- * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
- */
-class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
- extends ClusterScheduler(sc)
-{
- val startedTasks = new ArrayBuffer[Long]
- val endedTasks = new mutable.HashMap[Long, TaskEndReason]
- val finishedManagers = new ArrayBuffer[TaskSetManager]
-
- val executors = new mutable.HashMap[String, String] ++ liveExecutors
-
- listener = new TaskSchedulerListener {
- def taskStarted(task: Task[_], taskInfo: TaskInfo) {
- startedTasks += taskInfo.index
- }
-
- def taskEnded(
- task: Task[_],
- reason: TaskEndReason,
- result: Any,
- accumUpdates: mutable.Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
- {
- endedTasks(taskInfo.index) = reason
- }
-
- def executorGained(execId: String, host: String) {}
-
- def executorLost(execId: String) {}
-
- def taskSetFailed(taskSet: TaskSet, reason: String) {}
- }
-
- def removeExecutor(execId: String): Unit = executors -= execId
-
- override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
-
- override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
-
- override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
-}
-
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
- import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
-
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
-
- test("TaskSet with no preferences") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(1)
- val manager = new ClusterTaskSetManager(sched, taskSet)
-
- // Offer a host with no CPUs
- assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
-
- // Offer a host with process-local as the constraint; this should work because the TaskSet
- // above won't have any locality preferences
- val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
- assert(taskOption.isDefined)
- val task = taskOption.get
- assert(task.executorId === "exec1")
- assert(sched.startedTasks.contains(0))
-
- // Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
-
- // Tell it the task has finished
- manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
- assert(sched.endedTasks(0) === Success)
- assert(sched.finishedManagers.contains(manager))
- }
-
- test("multiple offers with no preferences") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
- val taskSet = createTaskSet(3)
- val manager = new ClusterTaskSetManager(sched, taskSet)
-
- // First three offers should all find tasks
- for (i <- 0 until 3) {
- val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
- assert(taskOption.isDefined)
- val task = taskOption.get
- assert(task.executorId === "exec1")
- }
- assert(sched.startedTasks.toSet === Set(0, 1, 2))
-
- // Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- // Finish the first two tasks
- manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
- manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
- assert(sched.endedTasks(0) === Success)
- assert(sched.endedTasks(1) === Success)
- assert(!sched.finishedManagers.contains(manager))
-
- // Finish the last task
- manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
- assert(sched.endedTasks(2) === Success)
- assert(sched.finishedManagers.contains(manager))
- }
-
- test("basic delay scheduling") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
- val taskSet = createTaskSet(4,
- Seq(TaskLocation("host1", "exec1")),
- Seq(TaskLocation("host2", "exec2")),
- Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
- Seq() // Last task has no locality prefs
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1, exec1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
-
- // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
-
- // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
-
- // Offer host1, exec1 again, at ANY level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1, exec1 again, at ANY level: task 1 should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
- }
-
- test("delay scheduling with fallback") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc,
- ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
- val taskSet = createTaskSet(5,
- Seq(TaskLocation("host1")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host3")),
- Seq(TaskLocation("host2"))
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1 again: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // Offer host1 again: second task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Offer host1 again: third task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
- // Offer host2: fifth task (also on host2) should get chosen
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
-
- // Now that we've launched a local task, we should no longer launch the task for host3
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
-
- clock.advance(LOCALITY_WAIT)
-
- // After another delay, we can go ahead and launch that task non-locally
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
- }
-
- test("delay scheduling with failed hosts") {
- sc = new SparkContext("local", "test")
- val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
- val taskSet = createTaskSet(3,
- Seq(TaskLocation("host1")),
- Seq(TaskLocation("host2")),
- Seq(TaskLocation("host3"))
- )
- val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
-
- // First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
-
- // Offer host1 again: third task should be chosen immediately because host3 is not up
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
-
- // After this, nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
-
- // Now mark host2 as dead
- sched.removeExecutor("exec2")
- manager.executorLost("exec2", "host2")
-
- // Task 1 should immediately be launched on host1 because its original host is gone
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
-
- // Now that all tasks have launched, nothing new should be launched anywhere else
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
- }
-
- /**
- * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
- * locations for each task (given as varargs) if this sequence is not empty.
- */
- def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
- if (prefLocs.size != 0 && prefLocs.size != numTasks) {
- throw new IllegalArgumentException("Wrong number of task locations")
- }
- val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
- }
- new TaskSet(tasks, 0, 0, 0, null)
- }
-
- def createTaskResult(id: Int): ByteBuffer = {
- ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
deleted file mode 100644
index de9e66b..0000000
--- a/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import spark.scheduler.{TaskLocation, Task}
-
-class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId) {
- override def run(attemptId: Long): Int = 0
-
- override def preferredLocations: Seq[TaskLocation] = prefLocs
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
deleted file mode 100644
index d28ee47..0000000
--- a/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.local
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-import java.util.concurrent.Semaphore
-import java.util.concurrent.CountDownLatch
-import java.util.Properties
-
-class Lock() {
- var finished = false
- def jobWait() = {
- synchronized {
- while(!finished) {
- this.wait()
- }
- }
- }
-
- def jobFinished() = {
- synchronized {
- finished = true
- this.notifyAll()
- }
- }
-}
-
-object TaskThreadInfo {
- val threadToLock = HashMap[Int, Lock]()
- val threadToRunning = HashMap[Int, Boolean]()
- val threadToStarted = HashMap[Int, CountDownLatch]()
-}
-
-/*
- * 1. each thread contains one job.
- * 2. each job contains one stage.
- * 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- * it will get cpu core resource, and will wait to finished after user manually
- * release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
- * thus it will be scheduled later when cluster has free cpu cores.
- */
-class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
-
- def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
- TaskThreadInfo.threadToRunning(threadIndex) = false
- val nums = sc.parallelize(threadIndex to threadIndex, 1)
- TaskThreadInfo.threadToLock(threadIndex) = new Lock()
- TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
- new Thread {
- if (poolName != null) {
- sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
- }
- override def run() {
- val ans = nums.map(number => {
- TaskThreadInfo.threadToRunning(number) = true
- TaskThreadInfo.threadToStarted(number).countDown()
- TaskThreadInfo.threadToLock(number).jobWait()
- TaskThreadInfo.threadToRunning(number) = false
- number
- }).collect()
- assert(ans.toList === List(threadIndex))
- sem.release()
- }
- }.start()
- }
-
- test("Local FIFO scheduler end-to-end test") {
- System.setProperty("spark.cluster.schedulingmode", "FIFO")
- sc = new SparkContext("local[4]", "test")
- val sem = new Semaphore(0)
-
- createThread(1,null,sc,sem)
- TaskThreadInfo.threadToStarted(1).await()
- createThread(2,null,sc,sem)
- TaskThreadInfo.threadToStarted(2).await()
- createThread(3,null,sc,sem)
- TaskThreadInfo.threadToStarted(3).await()
- createThread(4,null,sc,sem)
- TaskThreadInfo.threadToStarted(4).await()
- // thread 5 and 6 (stage pending)must meet following two points
- // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
- // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
- // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
- // So I just use "sleep" 1s here for each thread.
- // TODO: any better solution?
- createThread(5,null,sc,sem)
- Thread.sleep(1000)
- createThread(6,null,sc,sem)
- Thread.sleep(1000)
-
- assert(TaskThreadInfo.threadToRunning(1) === true)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === true)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === false)
- assert(TaskThreadInfo.threadToRunning(6) === false)
-
- TaskThreadInfo.threadToLock(1).jobFinished()
- TaskThreadInfo.threadToStarted(5).await()
-
- assert(TaskThreadInfo.threadToRunning(1) === false)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === true)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === true)
- assert(TaskThreadInfo.threadToRunning(6) === false)
-
- TaskThreadInfo.threadToLock(3).jobFinished()
- TaskThreadInfo.threadToStarted(6).await()
-
- assert(TaskThreadInfo.threadToRunning(1) === false)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === false)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === true)
- assert(TaskThreadInfo.threadToRunning(6) === true)
-
- TaskThreadInfo.threadToLock(2).jobFinished()
- TaskThreadInfo.threadToLock(4).jobFinished()
- TaskThreadInfo.threadToLock(5).jobFinished()
- TaskThreadInfo.threadToLock(6).jobFinished()
- sem.acquire(6)
- }
-
- test("Local fair scheduler end-to-end test") {
- sc = new SparkContext("local[8]", "LocalSchedulerSuite")
- val sem = new Semaphore(0)
- System.setProperty("spark.cluster.schedulingmode", "FAIR")
- val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
-
- createThread(10,"1",sc,sem)
- TaskThreadInfo.threadToStarted(10).await()
- createThread(20,"2",sc,sem)
- TaskThreadInfo.threadToStarted(20).await()
- createThread(30,"3",sc,sem)
- TaskThreadInfo.threadToStarted(30).await()
-
- assert(TaskThreadInfo.threadToRunning(10) === true)
- assert(TaskThreadInfo.threadToRunning(20) === true)
- assert(TaskThreadInfo.threadToRunning(30) === true)
-
- createThread(11,"1",sc,sem)
- TaskThreadInfo.threadToStarted(11).await()
- createThread(21,"2",sc,sem)
- TaskThreadInfo.threadToStarted(21).await()
- createThread(31,"3",sc,sem)
- TaskThreadInfo.threadToStarted(31).await()
-
- assert(TaskThreadInfo.threadToRunning(11) === true)
- assert(TaskThreadInfo.threadToRunning(21) === true)
- assert(TaskThreadInfo.threadToRunning(31) === true)
-
- createThread(12,"1",sc,sem)
- TaskThreadInfo.threadToStarted(12).await()
- createThread(22,"2",sc,sem)
- TaskThreadInfo.threadToStarted(22).await()
- createThread(32,"3",sc,sem)
-
- assert(TaskThreadInfo.threadToRunning(12) === true)
- assert(TaskThreadInfo.threadToRunning(22) === true)
- assert(TaskThreadInfo.threadToRunning(32) === false)
-
- TaskThreadInfo.threadToLock(10).jobFinished()
- TaskThreadInfo.threadToStarted(32).await()
-
- assert(TaskThreadInfo.threadToRunning(32) === true)
-
- //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
- // queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
- //2. priority of 23 and 33 will be meaningless as using fair scheduler here.
- createThread(23,"2",sc,sem)
- createThread(33,"3",sc,sem)
- Thread.sleep(1000)
-
- TaskThreadInfo.threadToLock(11).jobFinished()
- TaskThreadInfo.threadToStarted(23).await()
-
- assert(TaskThreadInfo.threadToRunning(23) === true)
- assert(TaskThreadInfo.threadToRunning(33) === false)
-
- TaskThreadInfo.threadToLock(12).jobFinished()
- TaskThreadInfo.threadToStarted(33).await()
-
- assert(TaskThreadInfo.threadToRunning(33) === true)
-
- TaskThreadInfo.threadToLock(20).jobFinished()
- TaskThreadInfo.threadToLock(21).jobFinished()
- TaskThreadInfo.threadToLock(22).jobFinished()
- TaskThreadInfo.threadToLock(23).jobFinished()
- TaskThreadInfo.threadToLock(30).jobFinished()
- TaskThreadInfo.threadToLock(31).jobFinished()
- TaskThreadInfo.threadToLock(32).jobFinished()
- TaskThreadInfo.threadToLock(33).jobFinished()
-
- sem.acquire(11)
- }
-}