You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/25 01:35:47 UTC

[16/20] git commit: Renamed ClusterScheduler to TaskSchedulerImpl

Renamed ClusterScheduler to TaskSchedulerImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/30186aa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/30186aa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/30186aa2

Branch: refs/heads/master
Commit: 30186aa2648f90d0ad4e312d28e99c9378ea317a
Parents: c06945c
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 14:58:04 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 14:58:04 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  20 +-
 .../spark/scheduler/ClusterScheduler.scala      | 473 -------------------
 .../spark/scheduler/TaskResultGetter.scala      |   2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 473 +++++++++++++++++++
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   4 +-
 .../cluster/SimrSchedulerBackend.scala          |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   4 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   4 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   4 +-
 .../spark/scheduler/local/LocalBackend.scala    |   6 +-
 .../SparkContextSchedulerCreationSuite.scala    |   6 +-
 .../spark/scheduler/ClusterSchedulerSuite.scala |  10 +-
 .../spark/scheduler/TaskResultGetterSuite.scala |   6 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |   2 +-
 15 files changed, 510 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 663b473..ad3337d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1044,25 +1044,25 @@ object SparkContext {
 
     master match {
       case "local" =>
-        val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, 1)
         scheduler.initialize(backend)
         scheduler
 
       case LOCAL_N_REGEX(threads) =>
-        val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, threads.toInt)
         scheduler.initialize(backend)
         scheduler
 
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
-        val scheduler = new ClusterScheduler(sc, maxFailures.toInt, isLocal = true)
+        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
         val backend = new LocalBackend(scheduler, threads.toInt)
         scheduler.initialize(backend)
         scheduler
 
       case SPARK_REGEX(sparkUrl) =>
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
         scheduler.initialize(backend)
@@ -1077,7 +1077,7 @@ object SparkContext {
               memoryPerSlaveInt, SparkContext.executorMemoryRequested))
         }
 
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val localCluster = new LocalSparkCluster(
           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
         val masterUrls = localCluster.start()
@@ -1092,7 +1092,7 @@ object SparkContext {
         val scheduler = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
           // TODO: Enumerate the exact reasons why it can fail
           // But irrespective of it, it means we cannot proceed !
@@ -1108,7 +1108,7 @@ object SparkContext {
         val scheduler = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
         } catch {
           case th: Throwable => {
@@ -1118,7 +1118,7 @@ object SparkContext {
 
         val backend = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
-          val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
+          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
         } catch {
           case th: Throwable => {
@@ -1131,7 +1131,7 @@ object SparkContext {
 
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
@@ -1143,7 +1143,7 @@ object SparkContext {
         scheduler
 
       case SIMR_REGEX(simrUrl) =>
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
         scheduler.initialize(backend)
         scheduler

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
deleted file mode 100644
index 1ad735b..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{TimerTask, Timer}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.concurrent.duration._
-
-import org.apache.spark._
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-
-/**
- * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
- * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
- * It handles common logic, like determining a scheduling order across jobs, waking up to launch
- * speculative tasks, etc.
- * 
- * Clients should first call initialize() and start(), then submit task sets through the
- * runTasks method.
- *
- * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
- * threads, so it needs locks in public API methods to maintain its state. In addition, some
- * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
- * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
- * we are holding a lock on ourselves.
- */
-private[spark] class ClusterScheduler(
-  val sc: SparkContext,
-  val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
-  isLocal: Boolean = false) extends TaskScheduler with Logging {
-
-  // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
-
-  // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
-
-  // TaskSetManagers are not thread safe, so any access to one should be synchronized
-  // on this class.
-  val activeTaskSets = new HashMap[String, TaskSetManager]
-
-  val taskIdToTaskSetId = new HashMap[Long, String]
-  val taskIdToExecutorId = new HashMap[Long, String]
-  val taskSetTaskIds = new HashMap[String, HashSet[Long]]
-
-  @volatile private var hasReceivedTask = false
-  @volatile private var hasLaunchedTask = false
-  private val starvationTimer = new Timer(true)
-
-  // Incrementing task IDs
-  val nextTaskId = new AtomicLong(0)
-
-  // Which executor IDs we have executors on
-  val activeExecutorIds = new HashSet[String]
-
-  // The set of executors we have on each host; this is used to compute hostsAlive, which
-  // in turn is used to decide when we can attain data locality on a given host
-  private val executorsByHost = new HashMap[String, HashSet[String]]
-
-  private val executorIdToHost = new HashMap[String, String]
-
-  // Listener object to pass upcalls into
-  var dagScheduler: DAGScheduler = null
-
-  var backend: SchedulerBackend = null
-
-  val mapOutputTracker = SparkEnv.get.mapOutputTracker
-
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
-  // default scheduler is FIFO
-  val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    System.getProperty("spark.scheduler.mode", "FIFO"))
-
-  // This is a var so that we can reset it for testing purposes.
-  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
-
-  override def setDAGScheduler(dagScheduler: DAGScheduler) {
-    this.dagScheduler = dagScheduler
-  }
-
-  def initialize(backend: SchedulerBackend) {
-    this.backend = backend
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
-    schedulableBuilder = {
-      schedulingMode match {
-        case SchedulingMode.FIFO =>
-          new FIFOSchedulableBuilder(rootPool)
-        case SchedulingMode.FAIR =>
-          new FairSchedulableBuilder(rootPool)
-      }
-    }
-    schedulableBuilder.buildPools()
-  }
-
-  def newTaskId(): Long = nextTaskId.getAndIncrement()
-
-  override def start() {
-    backend.start()
-
-    if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
-      logInfo("Starting speculative execution thread")
-      import sc.env.actorSystem.dispatcher
-      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
-            SPECULATION_INTERVAL milliseconds) {
-        checkSpeculatableTasks()
-      }
-    }
-  }
-
-  override def submitTasks(taskSet: TaskSet) {
-    val tasks = taskSet.tasks
-    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
-    this.synchronized {
-      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
-      activeTaskSets(taskSet.id) = manager
-      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
-      taskSetTaskIds(taskSet.id) = new HashSet[Long]()
-
-      if (!isLocal && !hasReceivedTask) {
-        starvationTimer.scheduleAtFixedRate(new TimerTask() {
-          override def run() {
-            if (!hasLaunchedTask) {
-              logWarning("Initial job has not accepted any resources; " +
-                "check your cluster UI to ensure that workers are registered " +
-                "and have sufficient memory")
-            } else {
-              this.cancel()
-            }
-          }
-        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
-      }
-      hasReceivedTask = true
-    }
-    backend.reviveOffers()
-  }
-
-  override def cancelTasks(stageId: Int): Unit = synchronized {
-    logInfo("Cancelling stage " + stageId)
-    activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
-      // There are two possible cases here:
-      // 1. The task set manager has been created and some tasks have been scheduled.
-      //    In this case, send a kill signal to the executors to kill the task and then abort
-      //    the stage.
-      // 2. The task set manager has been created but no tasks has been scheduled. In this case,
-      //    simply abort the stage.
-      val taskIds = taskSetTaskIds(tsm.taskSet.id)
-      if (taskIds.size > 0) {
-        taskIds.foreach { tid =>
-          val execId = taskIdToExecutorId(tid)
-          backend.killTask(tid, execId)
-        }
-      }
-      logInfo("Stage %d was cancelled".format(stageId))
-      tsm.removeAllRunningTasks()
-      taskSetFinished(tsm)
-    }
-  }
-
-  def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
-    // Check to see if the given task set has been removed. This is possible in the case of
-    // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has
-    // more than one running tasks).
-    if (activeTaskSets.contains(manager.taskSet.id)) {
-      activeTaskSets -= manager.taskSet.id
-      manager.parent.removeSchedulable(manager)
-      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
-      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
-      taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
-      taskSetTaskIds.remove(manager.taskSet.id)
-    }
-  }
-
-  /**
-   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
-   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
-   * that tasks are balanced across the cluster.
-   */
-  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
-    SparkEnv.set(sc.env)
-
-    // Mark each slave as alive and remember its hostname
-    for (o <- offers) {
-      executorIdToHost(o.executorId) = o.host
-      if (!executorsByHost.contains(o.host)) {
-        executorsByHost(o.host) = new HashSet[String]()
-        executorGained(o.executorId, o.host)
-      }
-    }
-
-    // Build a list of tasks to assign to each worker
-    val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
-    val availableCpus = offers.map(o => o.cores).toArray
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
-    for (taskSet <- sortedTaskSets) {
-      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
-        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
-    }
-
-    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
-    // of locality levels so that it gets a chance to launch local tasks on all of them.
-    var launchedTask = false
-    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
-      do {
-        launchedTask = false
-        for (i <- 0 until offers.size) {
-          val execId = offers(i).executorId
-          val host = offers(i).host
-          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetId(tid) = taskSet.taskSet.id
-            taskSetTaskIds(taskSet.taskSet.id) += tid
-            taskIdToExecutorId(tid) = execId
-            activeExecutorIds += execId
-            executorsByHost(host) += execId
-            availableCpus(i) -= 1
-            launchedTask = true
-          }
-        }
-      } while (launchedTask)
-    }
-
-    if (tasks.size > 0) {
-      hasLaunchedTask = true
-    }
-    return tasks
-  }
-
-  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    var failedExecutor: Option[String] = None
-    synchronized {
-      try {
-        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
-          // We lost this entire executor, so remember that it's gone
-          val execId = taskIdToExecutorId(tid)
-          if (activeExecutorIds.contains(execId)) {
-            removeExecutor(execId)
-            failedExecutor = Some(execId)
-          }
-        }
-        taskIdToTaskSetId.get(tid) match {
-          case Some(taskSetId) =>
-            if (TaskState.isFinished(state)) {
-              taskIdToTaskSetId.remove(tid)
-              if (taskSetTaskIds.contains(taskSetId)) {
-                taskSetTaskIds(taskSetId) -= tid
-              }
-              taskIdToExecutorId.remove(tid)
-            }
-            activeTaskSets.get(taskSetId).foreach { taskSet =>
-              if (state == TaskState.FINISHED) {
-                taskSet.removeRunningTask(tid)
-                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
-              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
-                taskSet.removeRunningTask(tid)
-                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
-              }
-            }
-          case None =>
-            logInfo("Ignoring update from TID " + tid + " because its task set is gone")
-        }
-      } catch {
-        case e: Exception => logError("Exception in statusUpdate", e)
-      }
-    }
-    // Update the DAGScheduler without holding a lock on this, since that can deadlock
-    if (failedExecutor != None) {
-      dagScheduler.executorLost(failedExecutor.get)
-      backend.reviveOffers()
-    }
-  }
-
-  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
-    taskSetManager.handleTaskGettingResult(tid)
-  }
-
-  def handleSuccessfulTask(
-    taskSetManager: TaskSetManager,
-    tid: Long,
-    taskResult: DirectTaskResult[_]) = synchronized {
-    taskSetManager.handleSuccessfulTask(tid, taskResult)
-  }
-
-  def handleFailedTask(
-    taskSetManager: TaskSetManager,
-    tid: Long,
-    taskState: TaskState,
-    reason: Option[TaskEndReason]) = synchronized {
-    taskSetManager.handleFailedTask(tid, taskState, reason)
-    if (taskState != TaskState.KILLED) {
-      // Need to revive offers again now that the task set manager state has been updated to
-      // reflect failed tasks that need to be re-run.
-      backend.reviveOffers()
-    }
-  }
-
-  def error(message: String) {
-    synchronized {
-      if (activeTaskSets.size > 0) {
-        // Have each task set throw a SparkException with the error
-        for ((taskSetId, manager) <- activeTaskSets) {
-          try {
-            manager.error(message)
-          } catch {
-            case e: Exception => logError("Exception in error callback", e)
-          }
-        }
-      } else {
-        // No task sets are active but we still got an error. Just exit since this
-        // must mean the error is during registration.
-        // It might be good to do something smarter here in the future.
-        logError("Exiting due to error from cluster scheduler: " + message)
-        System.exit(1)
-      }
-    }
-  }
-
-  override def stop() {
-    if (backend != null) {
-      backend.stop()
-    }
-    if (taskResultGetter != null) {
-      taskResultGetter.stop()
-    }
-
-    // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
-    // TODO: Do something better !
-    Thread.sleep(5000L)
-  }
-
-  override def defaultParallelism() = backend.defaultParallelism()
-
-  // Check for speculatable tasks in all our active jobs.
-  def checkSpeculatableTasks() {
-    var shouldRevive = false
-    synchronized {
-      shouldRevive = rootPool.checkSpeculatableTasks()
-    }
-    if (shouldRevive) {
-      backend.reviveOffers()
-    }
-  }
-
-  // Check for pending tasks in all our active jobs.
-  def hasPendingTasks: Boolean = {
-    synchronized {
-      rootPool.hasPendingTasks()
-    }
-  }
-
-  def executorLost(executorId: String, reason: ExecutorLossReason) {
-    var failedExecutor: Option[String] = None
-
-    synchronized {
-      if (activeExecutorIds.contains(executorId)) {
-        val hostPort = executorIdToHost(executorId)
-        logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
-        removeExecutor(executorId)
-        failedExecutor = Some(executorId)
-      } else {
-         // We may get multiple executorLost() calls with different loss reasons. For example, one
-         // may be triggered by a dropped connection from the slave while another may be a report
-         // of executor termination from Mesos. We produce log messages for both so we eventually
-         // report the termination reason.
-         logError("Lost an executor " + executorId + " (already removed): " + reason)
-      }
-    }
-    // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
-    if (failedExecutor != None) {
-      dagScheduler.executorLost(failedExecutor.get)
-      backend.reviveOffers()
-    }
-  }
-
-  /** Remove an executor from all our data structures and mark it as lost */
-  private def removeExecutor(executorId: String) {
-    activeExecutorIds -= executorId
-    val host = executorIdToHost(executorId)
-    val execs = executorsByHost.getOrElse(host, new HashSet)
-    execs -= executorId
-    if (execs.isEmpty) {
-      executorsByHost -= host
-    }
-    executorIdToHost -= executorId
-    rootPool.executorLost(executorId, host)
-  }
-
-  def executorGained(execId: String, host: String) {
-    dagScheduler.executorGained(execId, host)
-  }
-
-  def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
-    executorsByHost.get(host).map(_.toSet)
-  }
-
-  def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
-    executorsByHost.contains(host)
-  }
-
-  def isExecutorAlive(execId: String): Boolean = synchronized {
-    activeExecutorIds.contains(execId)
-  }
-
-  // By default, rack is unknown
-  def getRackForHost(value: String): Option[String] = None
-}
-
-
-private[spark] object ClusterScheduler {
-  /**
-   * Used to balance containers across hosts.
-   *
-   * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
-   * resource offers representing the order in which the offers should be used.  The resource
-   * offers are ordered such that we'll allocate one container on each host before allocating a
-   * second container on any host, and so on, in order to reduce the damage if a host fails.
-   *
-   * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
-   * [o1, o5, o4, 02, o6, o3]
-   */
-  def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
-    val _keyList = new ArrayBuffer[K](map.size)
-    _keyList ++= map.keys
-
-    // order keyList based on population of value in map
-    val keyList = _keyList.sortWith(
-      (left, right) => map(left).size > map(right).size
-    )
-
-    val retval = new ArrayBuffer[T](keyList.size * 2)
-    var index = 0
-    var found = true
-
-    while (found) {
-      found = false
-      for (key <- keyList) {
-        val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
-        assert(containerList != null)
-        // Get the index'th entry for this host - if present
-        if (index < containerList.size){
-          retval += containerList.apply(index)
-          found = true
-        }
-      }
-      index += 1
-    }
-
-    retval.toList
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 7b5543e..8910272 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
 /**
  * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
  */
-private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends Logging {
   private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
new file mode 100644
index 0000000..7409168
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.concurrent.duration._
+
+import org.apache.spark._
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+/**
+ * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
+ * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ * 
+ * Clients should first call initialize() and start(), then submit task sets through the
+ * runTasks method.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
+ */
+private[spark] class TaskSchedulerImpl(
+  val sc: SparkContext,
+  val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+  isLocal: Boolean = false) extends TaskScheduler with Logging {
+
+  // How often to check for speculative tasks
+  val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+
+  // Threshold above which we warn user initial TaskSet may be starved
+  val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+
+  // TaskSetManagers are not thread safe, so any access to one should be synchronized
+  // on this class.
+  val activeTaskSets = new HashMap[String, TaskSetManager]
+
+  val taskIdToTaskSetId = new HashMap[Long, String]
+  val taskIdToExecutorId = new HashMap[Long, String]
+  val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+
+  @volatile private var hasReceivedTask = false
+  @volatile private var hasLaunchedTask = false
+  private val starvationTimer = new Timer(true)
+
+  // Incrementing task IDs
+  val nextTaskId = new AtomicLong(0)
+
+  // Which executor IDs we have executors on
+  val activeExecutorIds = new HashSet[String]
+
+  // The set of executors we have on each host; this is used to compute hostsAlive, which
+  // in turn is used to decide when we can attain data locality on a given host
+  private val executorsByHost = new HashMap[String, HashSet[String]]
+
+  private val executorIdToHost = new HashMap[String, String]
+
+  // Listener object to pass upcalls into
+  var dagScheduler: DAGScheduler = null
+
+  var backend: SchedulerBackend = null
+
+  val mapOutputTracker = SparkEnv.get.mapOutputTracker
+
+  var schedulableBuilder: SchedulableBuilder = null
+  var rootPool: Pool = null
+  // default scheduler is FIFO
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.scheduler.mode", "FIFO"))
+
+  // This is a var so that we can reset it for testing purposes.
+  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
+
+  override def setDAGScheduler(dagScheduler: DAGScheduler) {
+    this.dagScheduler = dagScheduler
+  }
+
+  def initialize(backend: SchedulerBackend) {
+    this.backend = backend
+    // temporarily set rootPool name to empty
+    rootPool = new Pool("", schedulingMode, 0, 0)
+    schedulableBuilder = {
+      schedulingMode match {
+        case SchedulingMode.FIFO =>
+          new FIFOSchedulableBuilder(rootPool)
+        case SchedulingMode.FAIR =>
+          new FairSchedulableBuilder(rootPool)
+      }
+    }
+    schedulableBuilder.buildPools()
+  }
+
+  def newTaskId(): Long = nextTaskId.getAndIncrement()
+
+  override def start() {
+    backend.start()
+
+    if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
+      logInfo("Starting speculative execution thread")
+      import sc.env.actorSystem.dispatcher
+      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+            SPECULATION_INTERVAL milliseconds) {
+        checkSpeculatableTasks()
+      }
+    }
+  }
+
+  override def submitTasks(taskSet: TaskSet) {
+    val tasks = taskSet.tasks
+    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
+    this.synchronized {
+      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
+      activeTaskSets(taskSet.id) = manager
+      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
+      taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+      if (!isLocal && !hasReceivedTask) {
+        starvationTimer.scheduleAtFixedRate(new TimerTask() {
+          override def run() {
+            if (!hasLaunchedTask) {
+              logWarning("Initial job has not accepted any resources; " +
+                "check your cluster UI to ensure that workers are registered " +
+                "and have sufficient memory")
+            } else {
+              this.cancel()
+            }
+          }
+        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+      }
+      hasReceivedTask = true
+    }
+    backend.reviveOffers()
+  }
+
+  override def cancelTasks(stageId: Int): Unit = synchronized {
+    logInfo("Cancelling stage " + stageId)
+    activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
+      // There are two possible cases here:
+      // 1. The task set manager has been created and some tasks have been scheduled.
+      //    In this case, send a kill signal to the executors to kill the task and then abort
+      //    the stage.
+      // 2. The task set manager has been created but no tasks has been scheduled. In this case,
+      //    simply abort the stage.
+      val taskIds = taskSetTaskIds(tsm.taskSet.id)
+      if (taskIds.size > 0) {
+        taskIds.foreach { tid =>
+          val execId = taskIdToExecutorId(tid)
+          backend.killTask(tid, execId)
+        }
+      }
+      logInfo("Stage %d was cancelled".format(stageId))
+      tsm.removeAllRunningTasks()
+      taskSetFinished(tsm)
+    }
+  }
+
+  def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
+    // Check to see if the given task set has been removed. This is possible in the case of
+    // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has
+    // more than one running tasks).
+    if (activeTaskSets.contains(manager.taskSet.id)) {
+      activeTaskSets -= manager.taskSet.id
+      manager.parent.removeSchedulable(manager)
+      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+      taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
+      taskSetTaskIds.remove(manager.taskSet.id)
+    }
+  }
+
+  /**
+   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
+   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
+   * that tasks are balanced across the cluster.
+   */
+  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+    SparkEnv.set(sc.env)
+
+    // Mark each slave as alive and remember its hostname
+    for (o <- offers) {
+      executorIdToHost(o.executorId) = o.host
+      if (!executorsByHost.contains(o.host)) {
+        executorsByHost(o.host) = new HashSet[String]()
+        executorGained(o.executorId, o.host)
+      }
+    }
+
+    // Build a list of tasks to assign to each worker
+    val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+    val availableCpus = offers.map(o => o.cores).toArray
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+    for (taskSet <- sortedTaskSets) {
+      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
+        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
+    }
+
+    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+    // of locality levels so that it gets a chance to launch local tasks on all of them.
+    var launchedTask = false
+    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
+      do {
+        launchedTask = false
+        for (i <- 0 until offers.size) {
+          val execId = offers(i).executorId
+          val host = offers(i).host
+          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
+            tasks(i) += task
+            val tid = task.taskId
+            taskIdToTaskSetId(tid) = taskSet.taskSet.id
+            taskSetTaskIds(taskSet.taskSet.id) += tid
+            taskIdToExecutorId(tid) = execId
+            activeExecutorIds += execId
+            executorsByHost(host) += execId
+            availableCpus(i) -= 1
+            launchedTask = true
+          }
+        }
+      } while (launchedTask)
+    }
+
+    if (tasks.size > 0) {
+      hasLaunchedTask = true
+    }
+    return tasks
+  }
+
+  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    var failedExecutor: Option[String] = None
+    synchronized {
+      try {
+        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+          // We lost this entire executor, so remember that it's gone
+          val execId = taskIdToExecutorId(tid)
+          if (activeExecutorIds.contains(execId)) {
+            removeExecutor(execId)
+            failedExecutor = Some(execId)
+          }
+        }
+        taskIdToTaskSetId.get(tid) match {
+          case Some(taskSetId) =>
+            if (TaskState.isFinished(state)) {
+              taskIdToTaskSetId.remove(tid)
+              if (taskSetTaskIds.contains(taskSetId)) {
+                taskSetTaskIds(taskSetId) -= tid
+              }
+              taskIdToExecutorId.remove(tid)
+            }
+            activeTaskSets.get(taskSetId).foreach { taskSet =>
+              if (state == TaskState.FINISHED) {
+                taskSet.removeRunningTask(tid)
+                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
+                taskSet.removeRunningTask(tid)
+                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
+              }
+            }
+          case None =>
+            logInfo("Ignoring update from TID " + tid + " because its task set is gone")
+        }
+      } catch {
+        case e: Exception => logError("Exception in statusUpdate", e)
+      }
+    }
+    // Update the DAGScheduler without holding a lock on this, since that can deadlock
+    if (failedExecutor != None) {
+      dagScheduler.executorLost(failedExecutor.get)
+      backend.reviveOffers()
+    }
+  }
+
+  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+    taskSetManager.handleTaskGettingResult(tid)
+  }
+
+  def handleSuccessfulTask(
+    taskSetManager: TaskSetManager,
+    tid: Long,
+    taskResult: DirectTaskResult[_]) = synchronized {
+    taskSetManager.handleSuccessfulTask(tid, taskResult)
+  }
+
+  def handleFailedTask(
+    taskSetManager: TaskSetManager,
+    tid: Long,
+    taskState: TaskState,
+    reason: Option[TaskEndReason]) = synchronized {
+    taskSetManager.handleFailedTask(tid, taskState, reason)
+    if (taskState != TaskState.KILLED) {
+      // Need to revive offers again now that the task set manager state has been updated to
+      // reflect failed tasks that need to be re-run.
+      backend.reviveOffers()
+    }
+  }
+
+  def error(message: String) {
+    synchronized {
+      if (activeTaskSets.size > 0) {
+        // Have each task set throw a SparkException with the error
+        for ((taskSetId, manager) <- activeTaskSets) {
+          try {
+            manager.error(message)
+          } catch {
+            case e: Exception => logError("Exception in error callback", e)
+          }
+        }
+      } else {
+        // No task sets are active but we still got an error. Just exit since this
+        // must mean the error is during registration.
+        // It might be good to do something smarter here in the future.
+        logError("Exiting due to error from cluster scheduler: " + message)
+        System.exit(1)
+      }
+    }
+  }
+
+  override def stop() {
+    if (backend != null) {
+      backend.stop()
+    }
+    if (taskResultGetter != null) {
+      taskResultGetter.stop()
+    }
+
+    // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
+    // TODO: Do something better !
+    Thread.sleep(5000L)
+  }
+
+  override def defaultParallelism() = backend.defaultParallelism()
+
+  // Check for speculatable tasks in all our active jobs.
+  def checkSpeculatableTasks() {
+    var shouldRevive = false
+    synchronized {
+      shouldRevive = rootPool.checkSpeculatableTasks()
+    }
+    if (shouldRevive) {
+      backend.reviveOffers()
+    }
+  }
+
+  // Check for pending tasks in all our active jobs.
+  def hasPendingTasks: Boolean = {
+    synchronized {
+      rootPool.hasPendingTasks()
+    }
+  }
+
+  def executorLost(executorId: String, reason: ExecutorLossReason) {
+    var failedExecutor: Option[String] = None
+
+    synchronized {
+      if (activeExecutorIds.contains(executorId)) {
+        val hostPort = executorIdToHost(executorId)
+        logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
+        removeExecutor(executorId)
+        failedExecutor = Some(executorId)
+      } else {
+         // We may get multiple executorLost() calls with different loss reasons. For example, one
+         // may be triggered by a dropped connection from the slave while another may be a report
+         // of executor termination from Mesos. We produce log messages for both so we eventually
+         // report the termination reason.
+         logError("Lost an executor " + executorId + " (already removed): " + reason)
+      }
+    }
+    // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
+    if (failedExecutor != None) {
+      dagScheduler.executorLost(failedExecutor.get)
+      backend.reviveOffers()
+    }
+  }
+
+  /** Remove an executor from all our data structures and mark it as lost */
+  private def removeExecutor(executorId: String) {
+    activeExecutorIds -= executorId
+    val host = executorIdToHost(executorId)
+    val execs = executorsByHost.getOrElse(host, new HashSet)
+    execs -= executorId
+    if (execs.isEmpty) {
+      executorsByHost -= host
+    }
+    executorIdToHost -= executorId
+    rootPool.executorLost(executorId, host)
+  }
+
+  def executorGained(execId: String, host: String) {
+    dagScheduler.executorGained(execId, host)
+  }
+
+  def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
+    executorsByHost.get(host).map(_.toSet)
+  }
+
+  def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
+    executorsByHost.contains(host)
+  }
+
+  def isExecutorAlive(execId: String): Boolean = synchronized {
+    activeExecutorIds.contains(execId)
+  }
+
+  // By default, rack is unknown
+  def getRackForHost(value: String): Option[String] = None
+}
+
+
+private[spark] object TaskSchedulerImpl {
+  /**
+   * Used to balance containers across hosts.
+   *
+   * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+   * resource offers representing the order in which the offers should be used.  The resource
+   * offers are ordered such that we'll allocate one container on each host before allocating a
+   * second container on any host, and so on, in order to reduce the damage if a host fails.
+   *
+   * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+   * [o1, o5, o4, 02, o6, o3]
+   */
+  def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
+    val _keyList = new ArrayBuffer[K](map.size)
+    _keyList ++= map.keys
+
+    // order keyList based on population of value in map
+    val keyList = _keyList.sortWith(
+      (left, right) => map(left).size > map(right).size
+    )
+
+    val retval = new ArrayBuffer[T](keyList.size * 2)
+    var index = 0
+    var found = true
+
+    while (found) {
+      found = false
+      for (key <- keyList) {
+        val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
+        assert(containerList != null)
+        // Get the index'th entry for this host - if present
+        if (index < containerList.size){
+          retval += containerList.apply(index)
+          found = true
+        }
+      }
+      index += 1
+    }
+
+    retval.toList
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 0fe413a..0ac9829 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -48,7 +48,7 @@ import java.io.NotSerializableException
  *                        task set will be aborted
  */
 private[spark] class TaskSetManager(
-    sched: ClusterScheduler,
+    sched: TaskSchedulerImpl,
     val taskSet: TaskSet,
     val maxTaskFailures: Int,
     clock: Clock = SystemClock)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5797783..5c534a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -29,7 +29,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
 import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.{Logging, SparkException, TaskState}
-import org.apache.spark.scheduler.{ClusterScheduler, SchedulerBackend, SlaveLost, TaskDescription,
+import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
   WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
@@ -43,7 +43,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * (spark.deploy.*).
  */
 private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
   extends SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 2fbd725..ec3e68e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 
 import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.scheduler.ClusterScheduler
+import org.apache.spark.scheduler.TaskSchedulerImpl
 
 private[spark] class SimrSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     driverFilePath: String)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 1d38f0d..404ce7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -22,11 +22,11 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.deploy.client.{Client, ClientListener}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, ClusterScheduler}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.util.Utils
 
 private[spark] class SparkDeploySchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     masters: Array[String],
     appName: String)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5481828..39573fc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
 import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.ClusterScheduler
+import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
 /**
@@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
  * remove this.
  */
 private[spark] class CoarseMesosSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     master: String,
     appName: String)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 773b980..6aa788c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
 
 import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
-  TaskDescription, ClusterScheduler, WorkerOffer}
+  TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.util.Utils
 
 /**
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
  * from multiple apps can run on different cores) and in time (a core can switch ownership).
  */
 private[spark] class MesosSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     master: String,
     appName: String)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 6b5f1a5..69c1c04 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorRef, Props}
 import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
 
 private case class ReviveOffers()
 
@@ -38,7 +38,7 @@ private case class KillTask(taskId: Long)
  * and the ClusterScheduler.
  */
 private[spark] class LocalActor(
-  scheduler: ClusterScheduler,
+  scheduler: TaskSchedulerImpl,
   executorBackend: LocalBackend,
   private val totalCores: Int) extends Actor with Logging {
 
@@ -78,7 +78,7 @@ private[spark] class LocalActor(
  * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
  * on a single Executor (created by the LocalBackend) running locally.
  */
-private[spark] class LocalBackend(scheduler: ClusterScheduler, val totalCores: Int)
+private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
   extends SchedulerBackend with ExecutorBackend {
 
   var localActor: ActorRef = null

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index d4a7a11..9deed56 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import org.scalatest.{FunSuite, PrivateMethodTester}
 
-import org.apache.spark.scheduler.{ClusterScheduler, TaskScheduler}
+import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
@@ -27,13 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
 class SparkContextSchedulerCreationSuite
   extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
 
-  def createTaskScheduler(master: String): ClusterScheduler = {
+  def createTaskScheduler(master: String): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
     sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
     val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
-    sched.asInstanceOf[ClusterScheduler]
+    sched.asInstanceOf[TaskSchedulerImpl]
   }
 
   test("bad-master") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 35a06c4..702edb8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -29,7 +29,7 @@ class FakeTaskSetManager(
     initPriority: Int,
     initStageId: Int,
     initNumTasks: Int,
-    clusterScheduler: ClusterScheduler,
+    clusterScheduler: TaskSchedulerImpl,
     taskSet: TaskSet)
   extends TaskSetManager(clusterScheduler, taskSet, 0) {
 
@@ -104,7 +104,7 @@ class FakeTaskSetManager(
 
 class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
 
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
     new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
   }
 
@@ -131,7 +131,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("FIFO Scheduler Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task
@@ -158,7 +158,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("Fair Scheduler Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task
@@ -215,7 +215,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("Nested Pool Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9784920..2265619 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.TaskResultBlockId
  * Used to test the case where a BlockManager evicts the task result (or dies) before the
  * TaskResult is retrieved.
  */
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends TaskResultGetter(sparkEnv, scheduler) {
   var removedResult = false
 
@@ -92,8 +92,8 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
     sc = new SparkContext("local[1,1]", "test")
     // If this test hangs, it's probably because no resource offers were made after the task
     // failed.
-    val scheduler: ClusterScheduler = sc.taskScheduler match {
-      case clusterScheduler: ClusterScheduler =>
+    val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
+      case clusterScheduler: TaskSchedulerImpl =>
         clusterScheduler
       case _ =>
         assert(false, "Expect local cluster to use ClusterScheduler")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b34b6f3..771a64f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -58,7 +58,7 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
  * to work, and these are required for locality in TaskSetManager.
  */
 class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
-  extends ClusterScheduler(sc)
+  extends TaskSchedulerImpl(sc)
 {
   val startedTasks = new ArrayBuffer[Long]
   val endedTasks = new mutable.HashMap[Long, TaskEndReason]