You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/27 00:01:58 UTC

[08/11] Merge pull request #14 from kayousterhout/untangle_scheduler

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
new file mode 100644
index 0000000..50cbc2c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+import com.google.protobuf.ByteString
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
+import org.apache.spark.util.Utils
+
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
+private[spark] class MesosSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    appName: String)
+  extends SchedulerBackend
+  with MScheduler
+  with Logging {
+
+  // Lock used to wait for scheduler to be registered
+  var isRegistered = false
+  val registeredLock = new Object()
+
+  // Driver for talking to Mesos
+  var driver: SchedulerDriver = null
+
+  // Which slave IDs we have executors on
+  val slaveIdsWithExecutors = new HashSet[String]
+  val taskIdToSlaveId = new HashMap[Long, String]
+
+  // An ExecutorInfo for our tasks
+  var execArgs: Array[Byte] = null
+
+  var classLoader: ClassLoader = null
+
+  override def start() {
+    synchronized {
+      classLoader = Thread.currentThread.getContextClassLoader
+
+      new Thread("MesosSchedulerBackend driver") {
+        setDaemon(true)
+        override def run() {
+          val scheduler = MesosSchedulerBackend.this
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+          try {
+            val ret = driver.run()
+            logInfo("driver.run() returned with code " + ret)
+          } catch {
+            case e: Exception => logError("driver.run() failed", e)
+          }
+        }
+      }.start()
+
+      waitForRegister()
+    }
+  }
+
+  def createExecutorInfo(execId: String): ExecutorInfo = {
+    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+      "Spark home is not set; set it through the spark.home system " +
+      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+    val environment = Environment.newBuilder()
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.split('/').last.split('.').head
+      command.setValue("cd %s*; ./spark-executor".format(basename))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
+    val memory = Resource.newBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+      .build()
+    ExecutorInfo.newBuilder()
+      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
+      .setCommand(command)
+      .setData(ByteString.copyFrom(createExecArg()))
+      .addResources(memory)
+      .build()
+  }
+
+  /**
+   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
+   * containing all the spark.* system properties in the form of (String, String) pairs.
+   */
+  private def createExecArg(): Array[Byte] = {
+    if (execArgs == null) {
+      val props = new HashMap[String, String]
+      val iterator = System.getProperties.entrySet.iterator
+      while (iterator.hasNext) {
+        val entry = iterator.next
+        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+        if (key.startsWith("spark.")) {
+          props(key) = value
+        }
+      }
+      // Serialize the map as an array of (String, String) pairs
+      execArgs = Utils.serialize(props.toArray)
+    }
+    return execArgs
+  }
+
+  private def setClassLoader(): ClassLoader = {
+    val oldClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(classLoader)
+    return oldClassLoader
+  }
+
+  private def restoreClassLoader(oldClassLoader: ClassLoader) {
+    Thread.currentThread.setContextClassLoader(oldClassLoader)
+  }
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Registered as framework ID " + frameworkId.getValue)
+      registeredLock.synchronized {
+        isRegistered = true
+        registeredLock.notifyAll()
+      }
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  def waitForRegister() {
+    registeredLock.synchronized {
+      while (!isRegistered) {
+        registeredLock.wait()
+      }
+    }
+  }
+
+  override def disconnected(d: SchedulerDriver) {}
+
+  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
+   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
+   * tasks are balanced across the cluster.
+   */
+  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+    val oldClassLoader = setClassLoader()
+    try {
+      synchronized {
+        // Build a big list of the offerable workers, and remember their indices so that we can
+        // figure out which Offer to reply to for each worker
+        val offerableIndices = new ArrayBuffer[Int]
+        val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+        def enoughMemory(o: Offer) = {
+          val mem = getResource(o.getResourcesList, "mem")
+          val slaveId = o.getSlaveId.getValue
+          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+        }
+
+        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+          offerableIndices += index
+          offerableWorkers += new WorkerOffer(
+            offer.getSlaveId.getValue,
+            offer.getHostname,
+            getResource(offer.getResourcesList, "cpus").toInt)
+        }
+
+        // Call into the ClusterScheduler
+        val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+        // Build a list of Mesos tasks for each slave
+        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+        for ((taskList, index) <- taskLists.zipWithIndex) {
+          if (!taskList.isEmpty) {
+            val offerNum = offerableIndices(index)
+            val slaveId = offers(offerNum).getSlaveId.getValue
+            slaveIdsWithExecutors += slaveId
+            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+            for (taskDesc <- taskList) {
+              taskIdToSlaveId(taskDesc.taskId) = slaveId
+              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+            }
+          }
+        }
+
+        // Reply to the offers
+        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+        for (i <- 0 until offers.size) {
+          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+        }
+      }
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  /** Helper function to pull out a resource from a Mesos Resources protobuf */
+  def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    // If we reached here, no resource with the required name was present
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
+  /** Turn a Spark TaskDescription into a Mesos task */
+  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
+    val cpuResource = Resource.newBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
+      .build()
+    return MesosTaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setExecutor(createExecutorInfo(slaveId))
+      .setName(task.name)
+      .addResources(cpuResource)
+      .setData(ByteString.copyFrom(task.serializedTask))
+      .build()
+  }
+
+  /** Check whether a Mesos task state represents a finished task */
+  def isFinished(state: MesosTaskState) = {
+    state == MesosTaskState.TASK_FINISHED ||
+      state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+    val oldClassLoader = setClassLoader()
+    try {
+      val tid = status.getTaskId.getValue.toLong
+      val state = TaskState.fromMesos(status.getState)
+      synchronized {
+        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+          // We lost the executor on this slave, so remember that it's gone
+          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+        }
+        if (isFinished(status.getState)) {
+          taskIdToSlaveId.remove(tid)
+        }
+      }
+      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def error(d: SchedulerDriver, message: String) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logError("Mesos error: " + message)
+      scheduler.error(message)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def stop() {
+    if (driver != null) {
+      driver.stop()
+    }
+  }
+
+  override def reviveOffers() {
+    driver.reviveOffers()
+  }
+
+  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Mesos slave lost: " + slaveId.getValue)
+      synchronized {
+        slaveIdsWithExecutors -= slaveId.getValue
+      }
+      scheduler.executorLost(slaveId.getValue, reason)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+    recordSlaveLost(d, slaveId, SlaveLost())
+  }
+
+  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+                            slaveId: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+                                                                 slaveId.getValue))
+    recordSlaveLost(d, slaveId, ExecutorExited(status))
+  }
+
+  // TODO: query Mesos for number of cores
+  override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 8cb4d13..e29438f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -31,8 +31,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.ExecutorURLClassLoader
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import akka.actor._
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index e52cb99..a2fda4c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -23,8 +23,8 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
-import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality,
+  TaskResult, TaskSet, TaskSetManager}
 
 
 private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index 3dbe61d..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
-    scheduler: ClusterScheduler,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
-  with MScheduler
-  with Logging {
-
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
-
-  // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[Int, Int]
-  var totalCoresAcquired = 0
-
-  val slaveIdsWithExecutors = new HashSet[String]
-
-  val taskIdToSlaveId = new HashMap[Int, String]
-  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
-  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-    "Spark home is not set; set it through the spark.home system " +
-    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
-  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
-
-  var nextMesosTaskId = 0
-
-  def newMesosTaskId(): Int = {
-    val id = nextMesosTaskId
-    nextMesosTaskId += 1
-    id
-  }
-
-  override def start() {
-    super.start()
-
-    synchronized {
-      new Thread("CoarseMesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = CoarseMesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try { {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          }
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val driverUrl = "akka://spark@%s:%s/user/%s".format(
-      System.getProperty("spark.driver.host"),
-      System.getProperty("spark.driver.port"),
-      StandaloneSchedulerBackend.ACTOR_NAME)
-    val uri = System.getProperty("spark.executor.uri")
-    if (uri == null) {
-      val runScript = new File(sparkHome, "spark-class").getCanonicalPath
-      command.setValue(
-        "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue(
-        "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-          basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    return command.build()
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    logInfo("Registered as framework ID " + frameworkId.getValue)
-    registeredLock.synchronized {
-      isRegistered = true
-      registeredLock.notifyAll()
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
-   * unless we've already launched more than we wanted to.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    synchronized {
-      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
-      for (offer <- offers) {
-        val slaveId = offer.getSlaveId.toString
-        val mem = getResource(offer.getResourcesList, "mem")
-        val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-            !slaveIdsWithExecutors.contains(slaveId)) {
-          // Launch an executor on the slave
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-          val taskId = newMesosTaskId()
-          taskIdToSlaveId(taskId) = slaveId
-          slaveIdsWithExecutors += slaveId
-          coresByTaskId(taskId) = cpusToUse
-          val task = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
-            .setName("Task " + taskId)
-            .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", executorMemory))
-            .build()
-          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
-        } else {
-          // Filter it out
-          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
-        }
-      }
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  private def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Build a Mesos resource protobuf object */
-  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  private def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val taskId = status.getTaskId.getValue.toInt
-    val state = status.getState
-    logInfo("Mesos task " + taskId + " is now " + state)
-    synchronized {
-      if (isFinished(state)) {
-        val slaveId = taskIdToSlaveId(taskId)
-        slaveIdsWithExecutors -= slaveId
-        taskIdToSlaveId -= taskId
-        // Remove the cores we have remembered for this task, if it's in the hashmap
-        for (cores <- coresByTaskId.get(taskId)) {
-          totalCoresAcquired -= cores
-          coresByTaskId -= taskId
-        }
-        // If it was a failure, mark the slave as failed for blacklisting purposes
-        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
-          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
-          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
-                "is Spark installed on it?")
-          }
-        }
-        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
-      }
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
-    scheduler.error(message)
-  }
-
-  override def stop() {
-    super.stop()
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
-    synchronized {
-      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
-        // Note that the slave ID corresponds to the executor ID on that slave
-        slaveIdsWithExecutors -= slaveId.getValue
-        removeExecutor(slaveId.getValue, "Mesos slave lost")
-      }
-    }
-  }
-
-  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
-    slaveLost(d, s)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index 541f86e..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
-    scheduler: ClusterScheduler,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends SchedulerBackend
-  with MScheduler
-  with Logging {
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
-  val taskIdToSlaveId = new HashMap[Long, String]
-
-  // An ExecutorInfo for our tasks
-  var execArgs: Array[Byte] = null
-
-  var classLoader: ClassLoader = null
-
-  override def start() {
-    synchronized {
-      classLoader = Thread.currentThread.getContextClassLoader
-
-      new Thread("MesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = MesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createExecutorInfo(execId: String): ExecutorInfo = {
-    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-      "Spark home is not set; set it through the spark.home system " +
-      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val uri = System.getProperty("spark.executor.uri")
-    if (uri == null) {
-      command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; ./spark-executor".format(basename))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    val memory = Resource.newBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
-      .build()
-    ExecutorInfo.newBuilder()
-      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
-      .setCommand(command)
-      .setData(ByteString.copyFrom(createExecArg()))
-      .addResources(memory)
-      .build()
-  }
-
-  /**
-   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
-   * containing all the spark.* system properties in the form of (String, String) pairs.
-   */
-  private def createExecArg(): Array[Byte] = {
-    if (execArgs == null) {
-      val props = new HashMap[String, String]
-      val iterator = System.getProperties.entrySet.iterator
-      while (iterator.hasNext) {
-        val entry = iterator.next
-        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
-        if (key.startsWith("spark.")) {
-          props(key) = value
-        }
-      }
-      // Serialize the map as an array of (String, String) pairs
-      execArgs = Utils.serialize(props.toArray)
-    }
-    return execArgs
-  }
-
-  private def setClassLoader(): ClassLoader = {
-    val oldClassLoader = Thread.currentThread.getContextClassLoader
-    Thread.currentThread.setContextClassLoader(classLoader)
-    return oldClassLoader
-  }
-
-  private def restoreClassLoader(oldClassLoader: ClassLoader) {
-    Thread.currentThread.setContextClassLoader(oldClassLoader)
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Registered as framework ID " + frameworkId.getValue)
-      registeredLock.synchronized {
-        isRegistered = true
-        registeredLock.notifyAll()
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
-   * tasks are balanced across the cluster.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    val oldClassLoader = setClassLoader()
-    try {
-      synchronized {
-        // Build a big list of the offerable workers, and remember their indices so that we can
-        // figure out which Offer to reply to for each worker
-        val offerableIndices = new ArrayBuffer[Int]
-        val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
-        def enoughMemory(o: Offer) = {
-          val mem = getResource(o.getResourcesList, "mem")
-          val slaveId = o.getSlaveId.getValue
-          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
-        }
-
-        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-          offerableIndices += index
-          offerableWorkers += new WorkerOffer(
-            offer.getSlaveId.getValue,
-            offer.getHostname,
-            getResource(offer.getResourcesList, "cpus").toInt)
-        }
-
-        // Call into the ClusterScheduler
-        val taskLists = scheduler.resourceOffers(offerableWorkers)
-
-        // Build a list of Mesos tasks for each slave
-        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
-        for ((taskList, index) <- taskLists.zipWithIndex) {
-          if (!taskList.isEmpty) {
-            val offerNum = offerableIndices(index)
-            val slaveId = offers(offerNum).getSlaveId.getValue
-            slaveIdsWithExecutors += slaveId
-            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
-            for (taskDesc <- taskList) {
-              taskIdToSlaveId(taskDesc.taskId) = slaveId
-              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
-            }
-          }
-        }
-
-        // Reply to the offers
-        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-        for (i <- 0 until offers.size) {
-          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
-        }
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
-    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val cpuResource = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
-      .build()
-    return MesosTaskInfo.newBuilder()
-      .setTaskId(taskId)
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(createExecutorInfo(slaveId))
-      .setName(task.name)
-      .addResources(cpuResource)
-      .setData(ByteString.copyFrom(task.serializedTask))
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val oldClassLoader = setClassLoader()
-    try {
-      val tid = status.getTaskId.getValue.toLong
-      val state = TaskState.fromMesos(status.getState)
-      synchronized {
-        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
-        }
-        if (isFinished(status.getState)) {
-          taskIdToSlaveId.remove(tid)
-        }
-      }
-      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logError("Mesos error: " + message)
-      scheduler.error(message)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def stop() {
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def reviveOffers() {
-    driver.reviveOffers()
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Mesos slave lost: " + slaveId.getValue)
-      synchronized {
-        slaveIdsWithExecutors -= slaveId.getValue
-      }
-      scheduler.executorLost(slaveId.getValue, reason)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    recordSlaveLost(d, slaveId, SlaveLost())
-  }
-
-  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
-                            slaveId: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
-                                                                 slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
-  }
-
-  // TODO: query Mesos for number of cores
-  override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 3ec9760..453394d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,7 @@ import scala.util.Random
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index d1868dc..42e9be6 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler
 
 import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
+import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.Page.Executors
 import org.apache.spark.ui.UIUtils

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 3b428ef..b39c0e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.{NodeSeq, Node}
 
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
 import org.apache.spark.ui.Page._
 import org.apache.spark.ui.UIUtils._
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5d46f38..eb3b4e8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -21,10 +21,8 @@ import scala.Seq
 import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
 
 import org.apache.spark.{ExceptionFailure, SparkContext, Success}
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.executor.TaskMetrics
-import collection.mutable
+import org.apache.spark.scheduler._
 
 /**
  * Tracks task-level information to be displayed in the UI.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 6aecef5..e7eab37 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.{ExceptionFailure, SparkContext, Success}
 import org.apache.spark.scheduler._
 import collection.mutable
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index b3d3666..06810d8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 import scala.xml.Node
 
-import org.apache.spark.scheduler.Stage
-import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.scheduler.{Schedulable, Stage}
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a9969ab..163a374 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
+import org.apache.spark.{ExceptionFailure}
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
 import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{ExceptionFailure}
-import org.apache.spark.scheduler.cluster.TaskInfo
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
 
 /** Page showing statistics and task list for a given stage */
 private[spark] class StagePage(parent: JobProgressUI) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 32776ea..07db862 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -22,8 +22,7 @@ import java.util.Date
 import scala.xml.Node
 import scala.collection.mutable.HashSet
 
-import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import org.apache.spark.scheduler.Stage
+import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
 import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 94f66c9..9ed591e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,9 +32,9 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
 import org.apache.spark.{FetchFailed, Success, TaskEndReason}
 import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
 
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.Pool
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
  * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler