You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/27 07:52:50 UTC

git commit: Remove references to ClusterScheduler (SPARK-1140)

Repository: spark
Updated Branches:
  refs/heads/master 26450351a -> 71f69d66c


Remove references to ClusterScheduler (SPARK-1140)

ClusterScheduler was renamed to TaskSchedulerImpl; this commit
updates comments and tests accordingly.

Author: Kay Ousterhout <ka...@gmail.com>

Closes #9 from kayousterhout/cluster_scheduler_death and squashes the following commits:

d6fd119 [Kay Ousterhout] Remove references to ClusterScheduler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71f69d66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71f69d66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71f69d66

Branch: refs/heads/master
Commit: 71f69d66ce50991e99408791ade25a670598d32a
Parents: 2645035
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Feb 26 22:52:42 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Feb 26 22:52:42 2014 -0800

----------------------------------------------------------------------
 .../spark/scheduler/SchedulerBackend.scala      |   2 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   2 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   7 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   2 +-
 .../spark/scheduler/local/LocalBackend.scala    |   4 +-
 .../spark/scheduler/ClusterSchedulerSuite.scala | 268 -------------------
 .../spark/scheduler/TaskResultGetterSuite.scala |   6 +-
 .../scheduler/TaskSchedulerImplSuite.scala      | 268 +++++++++++++++++++
 .../spark/scheduler/TaskSetManagerSuite.scala   |  20 +-
 9 files changed, 290 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index eefc8c2..f1924a4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 /**
  * A backend interface for scheduling systems that allows plugging in different ones under
- * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
+ * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
  * machines become available and can launch tasks on them.
  */
 private[spark] trait SchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1cdfed1..92616c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
- * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
+ * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
  * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
  * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
  * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 1a4b7e5..5ea4557 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet
 import scala.math.max
 import scala.math.min
 
-import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
+import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
+  SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.{Clock, SystemClock}
 
 /**
- * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
+ * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
  * each task, retries tasks if they fail (up to a limited number of times), and
  * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
  * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
@@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock}
  * THREADING: This class is designed to only be called from code with a lock on the
  * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
  *
- * @param sched           the ClusterScheduler associated with the TaskSetManager
+ * @param sched           the TaskSchedulerImpl associated with the TaskSetManager
  * @param taskSet         the TaskSet to manage scheduling for
  * @param maxTaskFailures if any particular task fails more than this number of times, the entire
  *                        task set will be aborted

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c576beb..bcf0ce1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend(
             getResource(offer.getResourcesList, "cpus").toInt)
         }
 
-        // Call into the ClusterScheduler
+        // Call into the TaskSchedulerImpl
         val taskLists = scheduler.resourceOffers(offerableWorkers)
 
         // Build a list of Mesos tasks for each slave

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 50f7e79..16e2f5c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -35,7 +35,7 @@ private case class KillTask(taskId: Long)
 /**
  * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
  * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
- * and the ClusterScheduler.
+ * and the TaskSchedulerImpl.
  */
 private[spark] class LocalActor(
   scheduler: TaskSchedulerImpl,
@@ -76,7 +76,7 @@ private[spark] class LocalActor(
 
 /**
  * LocalBackend is used when running a local version of Spark where the executor, backend, and
- * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
+ * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
  * on a single Executor (created by the LocalBackend) running locally.
  */
 private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
deleted file mode 100644
index 85e9299..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util.Properties
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.scalatest.FunSuite
-
-import org.apache.spark._
-
-class FakeTaskSetManager(
-    initPriority: Int,
-    initStageId: Int,
-    initNumTasks: Int,
-    clusterScheduler: TaskSchedulerImpl,
-    taskSet: TaskSet)
-  extends TaskSetManager(clusterScheduler, taskSet, 0) {
-
-  parent = null
-  weight = 1
-  minShare = 2
-  priority = initPriority
-  stageId = initStageId
-  name = "TaskSet_"+stageId
-  override val numTasks = initNumTasks
-  tasksSuccessful = 0
-
-  var numRunningTasks = 0
-  override def runningTasks = numRunningTasks
-
-  def increaseRunningTasks(taskNum: Int) {
-    numRunningTasks += taskNum
-    if (parent != null) {
-      parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  def decreaseRunningTasks(taskNum: Int) {
-    numRunningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable) {
-  }
-
-  override def removeSchedulable(schedulable: Schedulable) {
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    null
-  }
-
-  override def executorLost(executorId: String, host: String): Unit = {
-  }
-
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    if (tasksSuccessful + numRunningTasks < numTasks) {
-      increaseRunningTasks(1)
-      Some(new TaskDescription(0, execId, "task 0:0", 0, null))
-    } else {
-      None
-    }
-  }
-
-  override def checkSpeculatableTasks(): Boolean = {
-    true
-  }
-
-  def taskFinished() {
-    decreaseRunningTasks(1)
-    tasksSuccessful +=1
-    if (tasksSuccessful == numTasks) {
-      parent.removeSchedulable(this)
-    }
-  }
-
-  def abort() {
-    decreaseRunningTasks(numRunningTasks)
-    parent.removeSchedulable(this)
-  }
-}
-
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
-
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
-    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
-  }
-
-  def resourceOffer(rootPool: Pool): Int = {
-    val taskSetQueue = rootPool.getSortedTaskSetQueue()
-    /* Just for Test*/
-    for (manager <- taskSetQueue) {
-       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
-         manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
-    }
-    for (taskSet <- taskSetQueue) {
-      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
-        case Some(task) =>
-          return taskSet.stageId
-        case None => {}
-      }
-    }
-    -1
-  }
-
-  def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
-    assert(resourceOffer(rootPool) === expectedTaskSetId)
-  }
-
-  test("FIFO Scheduler Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new TaskSchedulerImpl(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
-    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
-    schedulableBuilder.buildPools()
-
-    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
-    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
-    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
-    checkTaskSetId(rootPool, 0)
-    resourceOffer(rootPool)
-    checkTaskSetId(rootPool, 1)
-    resourceOffer(rootPool)
-    taskSetManager1.abort()
-    checkTaskSetId(rootPool, 2)
-  }
-
-  test("Fair Scheduler Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new TaskSchedulerImpl(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
-    schedulableBuilder.buildPools()
-
-    assert(rootPool.getSchedulableByName("default") != null)
-    assert(rootPool.getSchedulableByName("1") != null)
-    assert(rootPool.getSchedulableByName("2") != null)
-    assert(rootPool.getSchedulableByName("3") != null)
-    assert(rootPool.getSchedulableByName("1").minShare === 2)
-    assert(rootPool.getSchedulableByName("1").weight === 1)
-    assert(rootPool.getSchedulableByName("2").minShare === 3)
-    assert(rootPool.getSchedulableByName("2").weight === 1)
-    assert(rootPool.getSchedulableByName("3").minShare === 0)
-    assert(rootPool.getSchedulableByName("3").weight === 1)
-
-    val properties1 = new Properties()
-    properties1.setProperty("spark.scheduler.pool","1")
-    val properties2 = new Properties()
-    properties2.setProperty("spark.scheduler.pool","2")
-
-    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
-    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
-    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
-    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
-    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
-    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 1)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 4)
-
-    taskSetManager12.taskFinished()
-    assert(rootPool.getSchedulableByName("1").runningTasks === 3)
-    taskSetManager24.abort()
-    assert(rootPool.getSchedulableByName("2").runningTasks === 2)
-  }
-
-  test("Nested Pool Test") {
-    sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new TaskSchedulerImpl(sc)
-    var tasks = ArrayBuffer[Task[_]]()
-    val task = new FakeTask(0)
-    tasks += task
-    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
-    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
-    rootPool.addSchedulable(pool0)
-    rootPool.addSchedulable(pool1)
-
-    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
-    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
-    pool0.addSchedulable(pool00)
-    pool0.addSchedulable(pool01)
-
-    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
-    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
-    pool1.addSchedulable(pool10)
-    pool1.addSchedulable(pool11)
-
-    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
-    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
-    pool00.addSchedulable(taskSetManager000)
-    pool00.addSchedulable(taskSetManager001)
-
-    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
-    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
-    pool01.addSchedulable(taskSetManager010)
-    pool01.addSchedulable(taskSetManager011)
-
-    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
-    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
-    pool10.addSchedulable(taskSetManager100)
-    pool10.addSchedulable(taskSetManager101)
-
-    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
-    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
-    pool11.addSchedulable(taskSetManager110)
-    pool11.addSchedulable(taskSetManager111)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 6)
-    checkTaskSetId(rootPool, 2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ac07f60..c4e7a4b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
     // If this test hangs, it's probably because no resource offers were made after the task
     // failed.
     val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
-      case clusterScheduler: TaskSchedulerImpl =>
-        clusterScheduler
+      case taskScheduler: TaskSchedulerImpl =>
+        taskScheduler
       case _ =>
-        assert(false, "Expect local cluster to use ClusterScheduler")
+        assert(false, "Expect local cluster to use TaskSchedulerImpl")
         throw new ClassCastException
     }
     scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
new file mode 100644
index 0000000..f4e62c6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+
+class FakeTaskSetManager(
+    initPriority: Int,
+    initStageId: Int,
+    initNumTasks: Int,
+    taskScheduler: TaskSchedulerImpl,
+    taskSet: TaskSet)
+  extends TaskSetManager(taskScheduler, taskSet, 0) {
+
+  parent = null
+  weight = 1
+  minShare = 2
+  priority = initPriority
+  stageId = initStageId
+  name = "TaskSet_"+stageId
+  override val numTasks = initNumTasks
+  tasksSuccessful = 0
+
+  var numRunningTasks = 0
+  override def runningTasks = numRunningTasks
+
+  def increaseRunningTasks(taskNum: Int) {
+    numRunningTasks += taskNum
+    if (parent != null) {
+      parent.increaseRunningTasks(taskNum)
+    }
+  }
+
+  def decreaseRunningTasks(taskNum: Int) {
+    numRunningTasks -= taskNum
+    if (parent != null) {
+      parent.decreaseRunningTasks(taskNum)
+    }
+  }
+
+  override def addSchedulable(schedulable: Schedulable) {
+  }
+
+  override def removeSchedulable(schedulable: Schedulable) {
+  }
+
+  override def getSchedulableByName(name: String): Schedulable = {
+    null
+  }
+
+  override def executorLost(executorId: String, host: String): Unit = {
+  }
+
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
+  {
+    if (tasksSuccessful + numRunningTasks < numTasks) {
+      increaseRunningTasks(1)
+      Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+    } else {
+      None
+    }
+  }
+
+  override def checkSpeculatableTasks(): Boolean = {
+    true
+  }
+
+  def taskFinished() {
+    decreaseRunningTasks(1)
+    tasksSuccessful +=1
+    if (tasksSuccessful == numTasks) {
+      parent.removeSchedulable(this)
+    }
+  }
+
+  def abort() {
+    decreaseRunningTasks(numRunningTasks)
+    parent.removeSchedulable(this)
+  }
+}
+
+class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
+
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
+    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
+  }
+
+  def resourceOffer(rootPool: Pool): Int = {
+    val taskSetQueue = rootPool.getSortedTaskSetQueue()
+    /* Just for Test*/
+    for (manager <- taskSetQueue) {
+       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
+         manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+    }
+    for (taskSet <- taskSetQueue) {
+      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
+        case Some(task) =>
+          return taskSet.stageId
+        case None => {}
+      }
+    }
+    -1
+  }
+
+  def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
+    assert(resourceOffer(rootPool) === expectedTaskSetId)
+  }
+
+  test("FIFO Scheduler Test") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+    schedulableBuilder.buildPools()
+
+    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
+    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
+    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
+
+    checkTaskSetId(rootPool, 0)
+    resourceOffer(rootPool)
+    checkTaskSetId(rootPool, 1)
+    resourceOffer(rootPool)
+    taskSetManager1.abort()
+    checkTaskSetId(rootPool, 2)
+  }
+
+  test("Fair Scheduler Test") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    System.setProperty("spark.scheduler.allocation.file", xmlPath)
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    schedulableBuilder.buildPools()
+
+    assert(rootPool.getSchedulableByName("default") != null)
+    assert(rootPool.getSchedulableByName("1") != null)
+    assert(rootPool.getSchedulableByName("2") != null)
+    assert(rootPool.getSchedulableByName("3") != null)
+    assert(rootPool.getSchedulableByName("1").minShare === 2)
+    assert(rootPool.getSchedulableByName("1").weight === 1)
+    assert(rootPool.getSchedulableByName("2").minShare === 3)
+    assert(rootPool.getSchedulableByName("2").weight === 1)
+    assert(rootPool.getSchedulableByName("3").minShare === 0)
+    assert(rootPool.getSchedulableByName("3").weight === 1)
+
+    val properties1 = new Properties()
+    properties1.setProperty("spark.scheduler.pool","1")
+    val properties2 = new Properties()
+    properties2.setProperty("spark.scheduler.pool","2")
+
+    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
+    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
+    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
+
+    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
+    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
+    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
+    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
+
+    checkTaskSetId(rootPool, 0)
+    checkTaskSetId(rootPool, 3)
+    checkTaskSetId(rootPool, 3)
+    checkTaskSetId(rootPool, 1)
+    checkTaskSetId(rootPool, 4)
+    checkTaskSetId(rootPool, 2)
+    checkTaskSetId(rootPool, 2)
+    checkTaskSetId(rootPool, 4)
+
+    taskSetManager12.taskFinished()
+    assert(rootPool.getSchedulableByName("1").runningTasks === 3)
+    taskSetManager24.abort()
+    assert(rootPool.getSchedulableByName("2").runningTasks === 2)
+  }
+
+  test("Nested Pool Test") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new FakeTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
+    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+    rootPool.addSchedulable(pool0)
+    rootPool.addSchedulable(pool1)
+
+    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
+    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+    pool0.addSchedulable(pool00)
+    pool0.addSchedulable(pool01)
+
+    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
+    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+    pool1.addSchedulable(pool10)
+    pool1.addSchedulable(pool11)
+
+    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
+    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
+    pool00.addSchedulable(taskSetManager000)
+    pool00.addSchedulable(taskSetManager001)
+
+    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
+    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
+    pool01.addSchedulable(taskSetManager010)
+    pool01.addSchedulable(taskSetManager011)
+
+    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
+    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
+    pool10.addSchedulable(taskSetManager100)
+    pool10.addSchedulable(taskSetManager101)
+
+    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
+    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
+    pool11.addSchedulable(taskSetManager110)
+    pool11.addSchedulable(taskSetManager111)
+
+    checkTaskSetId(rootPool, 0)
+    checkTaskSetId(rootPool, 4)
+    checkTaskSetId(rootPool, 6)
+    checkTaskSetId(rootPool, 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/71f69d66/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 34a7d8c..20f6e50 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.FakeClock
 
-class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
+class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
   override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     taskScheduler.startedTasks += taskInfo.index
   }
@@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
 }
 
 /**
- * A mock ClusterScheduler implementation that just remembers information about tasks started and
+ * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
  * feedback received from the TaskSetManagers. Note that it's important to initialize this with
  * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
  * to work, and these are required for locality in TaskSetManager.
  */
-class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
   extends TaskSchedulerImpl(sc)
 {
   val startedTasks = new ArrayBuffer[Long]
@@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
@@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("multiple offers with no preferences") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(3)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
@@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("basic delay scheduling") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = createTaskSet(4,
       Seq(TaskLocation("host1", "exec1")),
       Seq(TaskLocation("host2", "exec2")),
@@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("delay scheduling with fallback") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc,
+    val sched = new FakeTaskScheduler(sc,
       ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
     val taskSet = createTaskSet(5,
       Seq(TaskLocation("host1")),
@@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("delay scheduling with failed hosts") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = createTaskSet(3,
       Seq(TaskLocation("host1")),
       Seq(TaskLocation("host2")),
@@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("task result lost") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
     val clock = new FakeClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("repeated failures lead to task set abortion") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
     val clock = new FakeClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)