You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/26 23:12:23 UTC

[2/3] git commit: Improved organization of scheduling packages.

Improved organization of scheduling packages.

This commit does not change any code -- only file organization.

There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.

(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it is within the cluster package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d85fe41b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d85fe41b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d85fe41b

Branch: refs/heads/master
Commit: d85fe41b2b380e2879cb18008dbeb344ed7d7c92
Parents: 7220e8f
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Sep 25 12:18:32 2013 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Sep 25 12:45:46 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   4 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   1 -
 .../spark/scheduler/DAGSchedulerEvent.scala     |   1 -
 .../org/apache/spark/scheduler/JobLogger.scala  |   1 -
 .../scala/org/apache/spark/scheduler/Pool.scala | 121 +++++++
 .../apache/spark/scheduler/Schedulable.scala    |  48 +++
 .../spark/scheduler/SchedulableBuilder.scala    | 150 ++++++++
 .../spark/scheduler/SchedulingAlgorithm.scala   |  81 +++++
 .../apache/spark/scheduler/SchedulingMode.scala |  29 ++
 .../apache/spark/scheduler/SparkListener.scala  |   1 -
 .../org/apache/spark/scheduler/StageInfo.scala  |   2 +-
 .../spark/scheduler/TaskDescription.scala       |  37 ++
 .../org/apache/spark/scheduler/TaskInfo.scala   |  72 ++++
 .../apache/spark/scheduler/TaskLocality.scala   |  32 ++
 .../apache/spark/scheduler/TaskScheduler.scala  |   5 +-
 .../spark/scheduler/TaskSchedulerListener.scala |   1 -
 .../apache/spark/scheduler/TaskSetManager.scala |  50 +++
 .../scheduler/cluster/ClusterScheduler.scala    |   2 +-
 .../cluster/ClusterTaskSetManager.scala         |   9 +-
 .../apache/spark/scheduler/cluster/Pool.scala   | 121 -------
 .../spark/scheduler/cluster/Schedulable.scala   |  48 ---
 .../scheduler/cluster/SchedulableBuilder.scala  | 150 --------
 .../scheduler/cluster/SchedulingAlgorithm.scala |  81 -----
 .../scheduler/cluster/SchedulingMode.scala      |  29 --
 .../cluster/StandaloneClusterMessage.scala      |   1 +
 .../cluster/StandaloneSchedulerBackend.scala    |   1 +
 .../scheduler/cluster/TaskDescription.scala     |  37 --
 .../spark/scheduler/cluster/TaskInfo.scala      |  72 ----
 .../spark/scheduler/cluster/TaskLocality.scala  |  32 --
 .../scheduler/cluster/TaskSetManager.scala      |  51 ---
 .../mesos/CoarseMesosSchedulerBackend.scala     | 286 +++++++++++++++
 .../cluster/mesos/MesosSchedulerBackend.scala   | 345 +++++++++++++++++++
 .../spark/scheduler/local/LocalScheduler.scala  |   3 +-
 .../scheduler/local/LocalTaskSetManager.scala   |   4 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     | 286 ---------------
 .../scheduler/mesos/MesosSchedulerBackend.scala | 343 ------------------
 .../apache/spark/ui/UIWorkloadGenerator.scala   |   2 +-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  |   2 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |   2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |   4 +-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |   4 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |   3 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   6 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   3 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   6 +-
 45 files changed, 1280 insertions(+), 1289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6bab1f3..912ce75 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,9 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
-  ClusterScheduler, Schedulable, SchedulingMode}
+  ClusterScheduler}
 import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3e3f04f..8a55df4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -28,7 +28,6 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
 import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0d99670..10ff1b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection.mutable.Map
 
 import org.apache.spark._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index c8b78bf..3628b1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -30,7 +30,6 @@ import scala.io.Source
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
 
 // Used to record runtime information for each job, including RDD graph 
 // tasks' start/stop shuffle information and information from outside

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
new file mode 100644
index 0000000..c9a66b3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.Logging
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+/**
+ * An Schedulable entity that represent collection of Pools or TaskSetManagers
+ */
+
+private[spark] class Pool(
+    val poolName: String,
+    val schedulingMode: SchedulingMode,
+    initMinShare: Int,
+    initWeight: Int)
+  extends Schedulable
+  with Logging {
+
+  var schedulableQueue = new ArrayBuffer[Schedulable]
+  var schedulableNameToSchedulable = new HashMap[String, Schedulable]
+
+  var weight = initWeight
+  var minShare = initMinShare
+  var runningTasks = 0
+
+  var priority = 0
+  var stageId = 0
+  var name = poolName
+  var parent:Schedulable = null
+
+  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+    schedulingMode match {
+      case SchedulingMode.FAIR =>
+        new FairSchedulingAlgorithm()
+      case SchedulingMode.FIFO =>
+        new FIFOSchedulingAlgorithm()
+    }
+  }
+
+  override def addSchedulable(schedulable: Schedulable) {
+    schedulableQueue += schedulable
+    schedulableNameToSchedulable(schedulable.name) = schedulable
+    schedulable.parent= this
+  }
+
+  override def removeSchedulable(schedulable: Schedulable) {
+    schedulableQueue -= schedulable
+    schedulableNameToSchedulable -= schedulable.name
+  }
+
+  override def getSchedulableByName(schedulableName: String): Schedulable = {
+    if (schedulableNameToSchedulable.contains(schedulableName)) {
+      return schedulableNameToSchedulable(schedulableName)
+    }
+    for (schedulable <- schedulableQueue) {
+      var sched = schedulable.getSchedulableByName(schedulableName)
+      if (sched != null) {
+        return sched
+      }
+    }
+    return null
+  }
+
+  override def executorLost(executorId: String, host: String) {
+    schedulableQueue.foreach(_.executorLost(executorId, host))
+  }
+
+  override def checkSpeculatableTasks(): Boolean = {
+    var shouldRevive = false
+    for (schedulable <- schedulableQueue) {
+      shouldRevive |= schedulable.checkSpeculatableTasks()
+    }
+    return shouldRevive
+  }
+
+  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
+    for (schedulable <- sortedSchedulableQueue) {
+      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
+    }
+    return sortedTaskSetQueue
+  }
+
+  override def increaseRunningTasks(taskNum: Int) {
+    runningTasks += taskNum
+    if (parent != null) {
+      parent.increaseRunningTasks(taskNum)
+    }
+  }
+
+  override def decreaseRunningTasks(taskNum: Int) {
+    runningTasks -= taskNum
+    if (parent != null) {
+      parent.decreaseRunningTasks(taskNum)
+    }
+  }
+
+  override def hasPendingTasks(): Boolean = {
+    schedulableQueue.exists(_.hasPendingTasks())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
new file mode 100644
index 0000000..857adae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+import scala.collection.mutable.ArrayBuffer
+/**
+ * An interface for schedulable entities.
+ * there are two type of Schedulable entities(Pools and TaskSetManagers)
+ */
+private[spark] trait Schedulable {
+  var parent: Schedulable
+  // child queues
+  def schedulableQueue: ArrayBuffer[Schedulable]
+  def schedulingMode: SchedulingMode
+  def weight: Int
+  def minShare: Int
+  def runningTasks: Int
+  def priority: Int
+  def stageId: Int
+  def name: String
+
+  def increaseRunningTasks(taskNum: Int): Unit
+  def decreaseRunningTasks(taskNum: Int): Unit
+  def addSchedulable(schedulable: Schedulable): Unit
+  def removeSchedulable(schedulable: Schedulable): Unit
+  def getSchedulableByName(name: String): Schedulable
+  def executorLost(executorId: String, host: String): Unit
+  def checkSpeculatableTasks(): Boolean
+  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
+  def hasPendingTasks(): Boolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
new file mode 100644
index 0000000..4e25086
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
+
+import org.apache.spark.Logging
+
+import scala.xml.XML
+
+/**
+ * An interface to build Schedulable tree
+ * buildPools: build the tree nodes(pools)
+ * addTaskSetManager: build the leaf nodes(TaskSetManagers)
+ */
+private[spark] trait SchedulableBuilder {
+  def buildPools()
+  def addTaskSetManager(manager: Schedulable, properties: Properties)
+}
+
+private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
+  extends SchedulableBuilder with Logging {
+
+  override def buildPools() {
+    // nothing
+  }
+
+  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
+    rootPool.addSchedulable(manager)
+  }
+}
+
+private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+  extends SchedulableBuilder with Logging {
+
+  val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+  val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
+  val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
+  val DEFAULT_POOL_NAME = "default"
+  val MINIMUM_SHARES_PROPERTY = "minShare"
+  val SCHEDULING_MODE_PROPERTY = "schedulingMode"
+  val WEIGHT_PROPERTY = "weight"
+  val POOL_NAME_PROPERTY = "@name"
+  val POOLS_PROPERTY = "pool"
+  val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
+  val DEFAULT_MINIMUM_SHARE = 0
+  val DEFAULT_WEIGHT = 1
+
+  override def buildPools() {
+    var is: Option[InputStream] = None
+    try {
+      is = Option {
+        schedulerAllocFile.map { f =>
+          new FileInputStream(f)
+        }.getOrElse {
+          getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+        }
+      }
+
+      is.foreach { i => buildFairSchedulerPool(i) }
+    } finally {
+      is.foreach(_.close())
+    }
+
+    // finally create "default" pool
+    buildDefaultPool()
+  }
+
+  private def buildDefaultPool() {
+    if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
+      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
+        DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+      rootPool.addSchedulable(pool)
+      logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+        DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
+    }
+  }
+
+  private def buildFairSchedulerPool(is: InputStream) {
+    val xml = XML.load(is)
+    for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+      var schedulingMode = DEFAULT_SCHEDULING_MODE
+      var minShare = DEFAULT_MINIMUM_SHARE
+      var weight = DEFAULT_WEIGHT
+
+      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+      if (xmlSchedulingMode != "") {
+        try {
+          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+        } catch {
+          case e: NoSuchElementException =>
+            logWarning("Error xml schedulingMode, using default schedulingMode")
+        }
+      }
+
+      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+      if (xmlMinShare != "") {
+        minShare = xmlMinShare.toInt
+      }
+
+      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+      if (xmlWeight != "") {
+        weight = xmlWeight.toInt
+      }
+
+      val pool = new Pool(poolName, schedulingMode, minShare, weight)
+      rootPool.addSchedulable(pool)
+      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+        poolName, schedulingMode, minShare, weight))
+    }
+  }
+
+  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
+    var poolName = DEFAULT_POOL_NAME
+    var parentPool = rootPool.getSchedulableByName(poolName)
+    if (properties != null) {
+      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
+      parentPool = rootPool.getSchedulableByName(poolName)
+      if (parentPool == null) {
+        // we will create a new pool that user has configured in app
+        // instead of being defined in xml file
+        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
+          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+        rootPool.addSchedulable(parentPool)
+        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
+      }
+    }
+    parentPool.addSchedulable(manager)
+    logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
new file mode 100644
index 0000000..3418640
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * An interface for sort algorithm
+ * FIFO: FIFO algorithm between TaskSetManagers
+ * FS: FS algorithm between Pools, and FIFO or FS within Pools
+ */
+private[spark] trait SchedulingAlgorithm {
+  def comparator(s1: Schedulable, s2: Schedulable): Boolean
+}
+
+private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
+  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
+    val priority1 = s1.priority
+    val priority2 = s2.priority
+    var res = math.signum(priority1 - priority2)
+    if (res == 0) {
+      val stageId1 = s1.stageId
+      val stageId2 = s2.stageId
+      res = math.signum(stageId1 - stageId2)
+    }
+    if (res < 0) {
+      return true
+    } else {
+      return false
+    }
+  }
+}
+
+private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
+  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
+    val minShare1 = s1.minShare
+    val minShare2 = s2.minShare
+    val runningTasks1 = s1.runningTasks
+    val runningTasks2 = s2.runningTasks
+    val s1Needy = runningTasks1 < minShare1
+    val s2Needy = runningTasks2 < minShare2
+    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
+    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
+    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
+    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
+    var res:Boolean = true
+    var compare:Int = 0
+
+    if (s1Needy && !s2Needy) {
+      return true
+    } else if (!s1Needy && s2Needy) {
+      return false
+    } else if (s1Needy && s2Needy) {
+      compare = minShareRatio1.compareTo(minShareRatio2)
+    } else {
+      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
+    }
+
+    if (compare < 0) {
+      return true
+    } else if (compare > 0) {
+      return false
+    } else {
+      return s1.name < s2.name
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
new file mode 100644
index 0000000..0a786de
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ *  "FAIR" and "FIFO" determines which policy is used
+ *    to order tasks amongst a Schedulable's sub-queues
+ *  "NONE" is used when the a Schedulable has no sub-queues.
+ */
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
+
+  type SchedulingMode = Value
+  val FAIR,FIFO,NONE = Value
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c3cf4b8..62b521a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.util.{Utils, Distribution}
 import org.apache.spark.{Logging, SparkContext, TaskEndReason}
 import org.apache.spark.executor.TaskMetrics

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 72cb1c9..b6f1196 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection._
+
 import org.apache.spark.executor.TaskMetrics
 
 case class StageInfo(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
new file mode 100644
index 0000000..5190d23
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import org.apache.spark.util.SerializableBuffer
+
+private[spark] class TaskDescription(
+    val taskId: Long,
+    val executorId: String,
+    val name: String,
+    val index: Int,    // Index within this task's TaskSet
+    _serializedTask: ByteBuffer)
+  extends Serializable {
+
+  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
+  private val buffer = new SerializableBuffer(_serializedTask)
+
+  def serializedTask: ByteBuffer = buffer.value
+
+  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
new file mode 100644
index 0000000..7c2a422
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.util.Utils
+
+/**
+ * Information about a running task attempt inside a TaskSet.
+ */
+private[spark]
+class TaskInfo(
+    val taskId: Long,
+    val index: Int,
+    val launchTime: Long,
+    val executorId: String,
+    val host: String,
+    val taskLocality: TaskLocality.TaskLocality) {
+
+  var finishTime: Long = 0
+  var failed = false
+
+  def markSuccessful(time: Long = System.currentTimeMillis) {
+    finishTime = time
+  }
+
+  def markFailed(time: Long = System.currentTimeMillis) {
+    finishTime = time
+    failed = true
+  }
+
+  def finished: Boolean = finishTime != 0
+
+  def successful: Boolean = finished && !failed
+
+  def running: Boolean = !finished
+
+  def status: String = {
+    if (running)
+      "RUNNING"
+    else if (failed)
+      "FAILED"
+    else if (successful)
+      "SUCCESS"
+    else
+      "UNKNOWN"
+  }
+
+  def duration: Long = {
+    if (!finished) {
+      throw new UnsupportedOperationException("duration() called on unfinished tasks")
+    } else {
+      finishTime - launchTime
+    }
+  }
+
+  def timeRunning(currentTime: Long): Long = currentTime - launchTime
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
new file mode 100644
index 0000000..47b0f38
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+
+private[spark] object TaskLocality
+  extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
+{
+  // process local is expected to be used ONLY within tasksetmanager for now.
+  val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
+
+  type TaskLocality = Value
+
+  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
+    condition <= constraint
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 63be8ba..7c2a9f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
 /**
  * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
+ * Each TaskScheduler schedulers task for a single SparkContext.
  * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
  * and are responsible for sending the tasks to the cluster, running them, retrying if there
  * are failures, and mitigating stragglers. They return events to the DAGScheduler through

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
index 83be051..593fa9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection.mutable.Map
 
 import org.apache.spark.TaskEndReason

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
new file mode 100644
index 0000000..f192b0b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.TaskState.TaskState
+
+/**
+ * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
+ * each task and is responsible for retries on failure and locality. The main interfaces to it
+ * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
+ * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
+ * (e.g. its event handlers). It should not be called from other threads.
+ */
+private[spark] trait TaskSetManager extends Schedulable {
+  def schedulableQueue = null
+  
+  def schedulingMode = SchedulingMode.NONE
+  
+  def taskSet: TaskSet
+
+  def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription]
+
+  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
+
+  def error(message: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 919acce..a6dee60 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.HashSet
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{TimerTask, Timer}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 0ac3d7b..411e49b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 import scala.math.max
 import scala.math.min
+import scala.Some
 
-import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
-import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
+import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
+  SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
-import scala.Some
-import org.apache.spark.FetchFailed
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.TaskResultTooBigFailure
 import org.apache.spark.util.{SystemClock, Clock}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
deleted file mode 100644
index 35b3260..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
-/**
- * An Schedulable entity that represent collection of Pools or TaskSetManagers
- */
-
-private[spark] class Pool(
-    val poolName: String,
-    val schedulingMode: SchedulingMode,
-    initMinShare: Int,
-    initWeight: Int)
-  extends Schedulable
-  with Logging {
-
-  var schedulableQueue = new ArrayBuffer[Schedulable]
-  var schedulableNameToSchedulable = new HashMap[String, Schedulable]
-
-  var weight = initWeight
-  var minShare = initMinShare
-  var runningTasks = 0
-
-  var priority = 0
-  var stageId = 0
-  var name = poolName
-  var parent:Schedulable = null
-
-  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
-    schedulingMode match {
-      case SchedulingMode.FAIR =>
-        new FairSchedulingAlgorithm()
-      case SchedulingMode.FIFO =>
-        new FIFOSchedulingAlgorithm()
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable) {
-    schedulableQueue += schedulable
-    schedulableNameToSchedulable(schedulable.name) = schedulable
-    schedulable.parent= this
-  }
-
-  override def removeSchedulable(schedulable: Schedulable) {
-    schedulableQueue -= schedulable
-    schedulableNameToSchedulable -= schedulable.name
-  }
-
-  override def getSchedulableByName(schedulableName: String): Schedulable = {
-    if (schedulableNameToSchedulable.contains(schedulableName)) {
-      return schedulableNameToSchedulable(schedulableName)
-    }
-    for (schedulable <- schedulableQueue) {
-      var sched = schedulable.getSchedulableByName(schedulableName)
-      if (sched != null) {
-        return sched
-      }
-    }
-    return null
-  }
-
-  override def executorLost(executorId: String, host: String) {
-    schedulableQueue.foreach(_.executorLost(executorId, host))
-  }
-
-  override def checkSpeculatableTasks(): Boolean = {
-    var shouldRevive = false
-    for (schedulable <- schedulableQueue) {
-      shouldRevive |= schedulable.checkSpeculatableTasks()
-    }
-    return shouldRevive
-  }
-
-  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
-    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
-    for (schedulable <- sortedSchedulableQueue) {
-      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
-    }
-    return sortedTaskSetQueue
-  }
-
-  override def increaseRunningTasks(taskNum: Int) {
-    runningTasks += taskNum
-    if (parent != null) {
-      parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  override def decreaseRunningTasks(taskNum: Int) {
-    runningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def hasPendingTasks(): Boolean = {
-    schedulableQueue.exists(_.hasPendingTasks())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
deleted file mode 100644
index f472645..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
-import scala.collection.mutable.ArrayBuffer
-/**
- * An interface for schedulable entities.
- * there are two type of Schedulable entities(Pools and TaskSetManagers)
- */
-private[spark] trait Schedulable {
-  var parent: Schedulable
-  // child queues
-  def schedulableQueue: ArrayBuffer[Schedulable]
-  def schedulingMode: SchedulingMode
-  def weight: Int
-  def minShare: Int
-  def runningTasks: Int
-  def priority: Int
-  def stageId: Int
-  def name: String
-
-  def increaseRunningTasks(taskNum: Int): Unit
-  def decreaseRunningTasks(taskNum: Int): Unit
-  def addSchedulable(schedulable: Schedulable): Unit
-  def removeSchedulable(schedulable: Schedulable): Unit
-  def getSchedulableByName(name: String): Schedulable
-  def executorLost(executorId: String, host: String): Unit
-  def checkSpeculatableTasks(): Boolean
-  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
-  def hasPendingTasks(): Boolean
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
deleted file mode 100644
index 114617c..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.io.{FileInputStream, InputStream}
-import java.util.{NoSuchElementException, Properties}
-
-import org.apache.spark.Logging
-
-import scala.xml.XML
-
-/**
- * An interface to build Schedulable tree
- * buildPools: build the tree nodes(pools)
- * addTaskSetManager: build the leaf nodes(TaskSetManagers)
- */
-private[spark] trait SchedulableBuilder {
-  def buildPools()
-  def addTaskSetManager(manager: Schedulable, properties: Properties)
-}
-
-private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
-  extends SchedulableBuilder with Logging {
-
-  override def buildPools() {
-    // nothing
-  }
-
-  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
-    rootPool.addSchedulable(manager)
-  }
-}
-
-private[spark] class FairSchedulableBuilder(val rootPool: Pool)
-  extends SchedulableBuilder with Logging {
-
-  val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
-  val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
-  val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
-  val DEFAULT_POOL_NAME = "default"
-  val MINIMUM_SHARES_PROPERTY = "minShare"
-  val SCHEDULING_MODE_PROPERTY = "schedulingMode"
-  val WEIGHT_PROPERTY = "weight"
-  val POOL_NAME_PROPERTY = "@name"
-  val POOLS_PROPERTY = "pool"
-  val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
-  val DEFAULT_MINIMUM_SHARE = 0
-  val DEFAULT_WEIGHT = 1
-
-  override def buildPools() {
-    var is: Option[InputStream] = None
-    try {
-      is = Option {
-        schedulerAllocFile.map { f =>
-          new FileInputStream(f)
-        }.getOrElse {
-          getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
-        }
-      }
-
-      is.foreach { i => buildFairSchedulerPool(i) }
-    } finally {
-      is.foreach(_.close())
-    }
-
-    // finally create "default" pool
-    buildDefaultPool()
-  }
-
-  private def buildDefaultPool() {
-    if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
-      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
-        DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
-      rootPool.addSchedulable(pool)
-      logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
-        DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
-    }
-  }
-
-  private def buildFairSchedulerPool(is: InputStream) {
-    val xml = XML.load(is)
-    for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
-      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-      var schedulingMode = DEFAULT_SCHEDULING_MODE
-      var minShare = DEFAULT_MINIMUM_SHARE
-      var weight = DEFAULT_WEIGHT
-
-      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-      if (xmlSchedulingMode != "") {
-        try {
-          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-        } catch {
-          case e: NoSuchElementException =>
-            logWarning("Error xml schedulingMode, using default schedulingMode")
-        }
-      }
-
-      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-      if (xmlMinShare != "") {
-        minShare = xmlMinShare.toInt
-      }
-
-      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
-      if (xmlWeight != "") {
-        weight = xmlWeight.toInt
-      }
-
-      val pool = new Pool(poolName, schedulingMode, minShare, weight)
-      rootPool.addSchedulable(pool)
-      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
-        poolName, schedulingMode, minShare, weight))
-    }
-  }
-
-  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
-    var poolName = DEFAULT_POOL_NAME
-    var parentPool = rootPool.getSchedulableByName(poolName)
-    if (properties != null) {
-      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
-      parentPool = rootPool.getSchedulableByName(poolName)
-      if (parentPool == null) {
-        // we will create a new pool that user has configured in app
-        // instead of being defined in xml file
-        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
-          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
-        rootPool.addSchedulable(parentPool)
-        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
-          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
-      }
-    }
-    parentPool.addSchedulable(manager)
-    logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
deleted file mode 100644
index cbeed47..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-/**
- * An interface for sort algorithm
- * FIFO: FIFO algorithm between TaskSetManagers
- * FS: FS algorithm between Pools, and FIFO or FS within Pools
- */
-private[spark] trait SchedulingAlgorithm {
-  def comparator(s1: Schedulable, s2: Schedulable): Boolean
-}
-
-private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
-  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
-    val priority1 = s1.priority
-    val priority2 = s2.priority
-    var res = math.signum(priority1 - priority2)
-    if (res == 0) {
-      val stageId1 = s1.stageId
-      val stageId2 = s2.stageId
-      res = math.signum(stageId1 - stageId2)
-    }
-    if (res < 0) {
-      return true
-    } else {
-      return false
-    }
-  }
-}
-
-private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
-  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
-    val minShare1 = s1.minShare
-    val minShare2 = s2.minShare
-    val runningTasks1 = s1.runningTasks
-    val runningTasks2 = s2.runningTasks
-    val s1Needy = runningTasks1 < minShare1
-    val s2Needy = runningTasks2 < minShare2
-    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
-    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
-    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
-    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
-    var res:Boolean = true
-    var compare:Int = 0
-
-    if (s1Needy && !s2Needy) {
-      return true
-    } else if (!s1Needy && s2Needy) {
-      return false
-    } else if (s1Needy && s2Needy) {
-      compare = minShareRatio1.compareTo(minShareRatio2)
-    } else {
-      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
-    }
-
-    if (compare < 0) {
-      return true
-    } else if (compare > 0) {
-      return false
-    } else {
-      return s1.name < s2.name
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
deleted file mode 100644
index 3481138..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-/**
- *  "FAIR" and "FIFO" determines which policy is used
- *    to order tasks amongst a Schedulable's sub-queues
- *  "NONE" is used when the a Schedulable has no sub-queues.
- */
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
-
-  type SchedulingMode = Value
-  val FAIR,FIFO,NONE = Value
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 9c36d22..c0b836b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.util.{Utils, SerializableBuffer}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b4ea0be..f3aeea4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -29,6 +29,7 @@ import akka.util.Duration
 import akka.util.duration._
 
 import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
deleted file mode 100644
index 309ac2f..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-import org.apache.spark.util.SerializableBuffer
-
-private[spark] class TaskDescription(
-    val taskId: Long,
-    val executorId: String,
-    val name: String,
-    val index: Int,    // Index within this task's TaskSet
-    _serializedTask: ByteBuffer)
-  extends Serializable {
-
-  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
-  private val buffer = new SerializableBuffer(_serializedTask)
-
-  def serializedTask: ByteBuffer = buffer.value
-
-  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
deleted file mode 100644
index 9685fb1..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.util.Utils
-
-/**
- * Information about a running task attempt inside a TaskSet.
- */
-private[spark]
-class TaskInfo(
-    val taskId: Long,
-    val index: Int,
-    val launchTime: Long,
-    val executorId: String,
-    val host: String,
-    val taskLocality: TaskLocality.TaskLocality) {
-
-  var finishTime: Long = 0
-  var failed = false
-
-  def markSuccessful(time: Long = System.currentTimeMillis) {
-    finishTime = time
-  }
-
-  def markFailed(time: Long = System.currentTimeMillis) {
-    finishTime = time
-    failed = true
-  }
-
-  def finished: Boolean = finishTime != 0
-
-  def successful: Boolean = finished && !failed
-
-  def running: Boolean = !finished
-
-  def status: String = {
-    if (running)
-      "RUNNING"
-    else if (failed)
-      "FAILED"
-    else if (successful)
-      "SUCCESS"
-    else
-      "UNKNOWN"
-  }
-
-  def duration: Long = {
-    if (!finished) {
-      throw new UnsupportedOperationException("duration() called on unfinished tasks")
-    } else {
-      finishTime - launchTime
-    }
-  }
-
-  def timeRunning(currentTime: Long): Long = currentTime - launchTime
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
deleted file mode 100644
index 5d4130e..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-
-private[spark] object TaskLocality
-  extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
-{
-  // process local is expected to be used ONLY within tasksetmanager for now.
-  val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
-
-  type TaskLocality = Value
-
-  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
-    condition <= constraint
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
deleted file mode 100644
index 648a3ef..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.TaskSet
-
-/**
- * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
- * each task and is responsible for retries on failure and locality. The main interfaces to it
- * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
- * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
- *
- * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
- * (e.g. its event handlers). It should not be called from other threads.
- */
-private[spark] trait TaskSetManager extends Schedulable {
-  def schedulableQueue = null
-  
-  def schedulingMode = SchedulingMode.NONE
-  
-  def taskSet: TaskSet
-
-  def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription]
-
-  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
-
-  def error(message: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d85fe41b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
new file mode 100644
index 0000000..8f2eef9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+import com.google.protobuf.ByteString
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ *
+ * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
+ * remove this.
+ */
+private[spark] class CoarseMesosSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    appName: String)
+  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  with MScheduler
+  with Logging {
+
+  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
+
+  // Lock used to wait for scheduler to be registered
+  var isRegistered = false
+  val registeredLock = new Object()
+
+  // Driver for talking to Mesos
+  var driver: SchedulerDriver = null
+
+  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+  // Cores we have acquired with each Mesos task ID
+  val coresByTaskId = new HashMap[Int, Int]
+  var totalCoresAcquired = 0
+
+  val slaveIdsWithExecutors = new HashSet[String]
+
+  val taskIdToSlaveId = new HashMap[Int, String]
+  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+
+  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+    "Spark home is not set; set it through the spark.home system " +
+    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+
+  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+
+  var nextMesosTaskId = 0
+
+  def newMesosTaskId(): Int = {
+    val id = nextMesosTaskId
+    nextMesosTaskId += 1
+    id
+  }
+
+  override def start() {
+    super.start()
+
+    synchronized {
+      new Thread("CoarseMesosSchedulerBackend driver") {
+        setDaemon(true)
+        override def run() {
+          val scheduler = CoarseMesosSchedulerBackend.this
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+          try { {
+            val ret = driver.run()
+            logInfo("driver.run() returned with code " + ret)
+          }
+          } catch {
+            case e: Exception => logError("driver.run() failed", e)
+          }
+        }
+      }.start()
+
+      waitForRegister()
+    }
+  }
+
+  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+    val environment = Environment.newBuilder()
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+      System.getProperty("spark.driver.host"),
+      System.getProperty("spark.driver.port"),
+      StandaloneSchedulerBackend.ACTOR_NAME)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      val runScript = new File(sparkHome, "spark-class").getCanonicalPath
+      command.setValue(
+        "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.split('/').last.split('.').head
+      command.setValue(
+        "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+          basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
+    return command.build()
+  }
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    logInfo("Registered as framework ID " + frameworkId.getValue)
+    registeredLock.synchronized {
+      isRegistered = true
+      registeredLock.notifyAll()
+    }
+  }
+
+  def waitForRegister() {
+    registeredLock.synchronized {
+      while (!isRegistered) {
+        registeredLock.wait()
+      }
+    }
+  }
+
+  override def disconnected(d: SchedulerDriver) {}
+
+  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+   * unless we've already launched more than we wanted to.
+   */
+  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+    synchronized {
+      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+      for (offer <- offers) {
+        val slaveId = offer.getSlaveId.toString
+        val mem = getResource(offer.getResourcesList, "mem")
+        val cpus = getResource(offer.getResourcesList, "cpus").toInt
+        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+            !slaveIdsWithExecutors.contains(slaveId)) {
+          // Launch an executor on the slave
+          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+          val taskId = newMesosTaskId()
+          taskIdToSlaveId(taskId) = slaveId
+          slaveIdsWithExecutors += slaveId
+          coresByTaskId(taskId) = cpusToUse
+          val task = MesosTaskInfo.newBuilder()
+            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+            .setSlaveId(offer.getSlaveId)
+            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+            .setName("Task " + taskId)
+            .addResources(createResource("cpus", cpusToUse))
+            .addResources(createResource("mem", executorMemory))
+            .build()
+          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
+        } else {
+          // Filter it out
+          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
+        }
+      }
+    }
+  }
+
+  /** Helper function to pull out a resource from a Mesos Resources protobuf */
+  private def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    // If we reached here, no resource with the required name was present
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
+  /** Build a Mesos resource protobuf object */
+  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /** Check whether a Mesos task state represents a finished task */
+  private def isFinished(state: MesosTaskState) = {
+    state == MesosTaskState.TASK_FINISHED ||
+      state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+    val taskId = status.getTaskId.getValue.toInt
+    val state = status.getState
+    logInfo("Mesos task " + taskId + " is now " + state)
+    synchronized {
+      if (isFinished(state)) {
+        val slaveId = taskIdToSlaveId(taskId)
+        slaveIdsWithExecutors -= slaveId
+        taskIdToSlaveId -= taskId
+        // Remove the cores we have remembered for this task, if it's in the hashmap
+        for (cores <- coresByTaskId.get(taskId)) {
+          totalCoresAcquired -= cores
+          coresByTaskId -= taskId
+        }
+        // If it was a failure, mark the slave as failed for blacklisting purposes
+        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+                "is Spark installed on it?")
+          }
+        }
+        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+      }
+    }
+  }
+
+  override def error(d: SchedulerDriver, message: String) {
+    logError("Mesos error: " + message)
+    scheduler.error(message)
+  }
+
+  override def stop() {
+    super.stop()
+    if (driver != null) {
+      driver.stop()
+    }
+  }
+
+  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+    logInfo("Mesos slave lost: " + slaveId.getValue)
+    synchronized {
+      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
+        // Note that the slave ID corresponds to the executor ID on that slave
+        slaveIdsWithExecutors -= slaveId.getValue
+        removeExecutor(slaveId.getValue, "Mesos slave lost")
+      }
+    }
+  }
+
+  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+    slaveLost(d, s)
+  }
+}