You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/27 00:01:58 UTC
[08/11] Merge pull request #14 from kayousterhout/untangle_scheduler
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
new file mode 100644
index 0000000..50cbc2c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+import com.google.protobuf.ByteString
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
+import org.apache.spark.util.Utils
+
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
+private[spark] class MesosSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ appName: String)
+ extends SchedulerBackend
+ with MScheduler
+ with Logging {
+
+ // Lock used to wait for scheduler to be registered
+ var isRegistered = false
+ val registeredLock = new Object()
+
+ // Driver for talking to Mesos
+ var driver: SchedulerDriver = null
+
+ // Which slave IDs we have executors on
+ val slaveIdsWithExecutors = new HashSet[String]
+ val taskIdToSlaveId = new HashMap[Long, String]
+
+ // An ExecutorInfo for our tasks
+ var execArgs: Array[Byte] = null
+
+ var classLoader: ClassLoader = null
+
+ override def start() {
+ synchronized {
+ classLoader = Thread.currentThread.getContextClassLoader
+
+ new Thread("MesosSchedulerBackend driver") {
+ setDaemon(true)
+ override def run() {
+ val scheduler = MesosSchedulerBackend.this
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+ try {
+ val ret = driver.run()
+ logInfo("driver.run() returned with code " + ret)
+ } catch {
+ case e: Exception => logError("driver.run() failed", e)
+ }
+ }
+ }.start()
+
+ waitForRegister()
+ }
+ }
+
+ def createExecutorInfo(execId: String): ExecutorInfo = {
+ val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+ "Spark home is not set; set it through the spark.home system " +
+ "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+ val environment = Environment.newBuilder()
+ sc.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
+ }
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./spark-executor".format(basename))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ val memory = Resource.newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+ .build()
+ ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
+ .setCommand(command)
+ .setData(ByteString.copyFrom(createExecArg()))
+ .addResources(memory)
+ .build()
+ }
+
+ /**
+ * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
+ * containing all the spark.* system properties in the form of (String, String) pairs.
+ */
+ private def createExecArg(): Array[Byte] = {
+ if (execArgs == null) {
+ val props = new HashMap[String, String]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ props(key) = value
+ }
+ }
+ // Serialize the map as an array of (String, String) pairs
+ execArgs = Utils.serialize(props.toArray)
+ }
+ return execArgs
+ }
+
+ private def setClassLoader(): ClassLoader = {
+ val oldClassLoader = Thread.currentThread.getContextClassLoader
+ Thread.currentThread.setContextClassLoader(classLoader)
+ return oldClassLoader
+ }
+
+ private def restoreClassLoader(oldClassLoader: ClassLoader) {
+ Thread.currentThread.setContextClassLoader(oldClassLoader)
+ }
+
+ override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ def waitForRegister() {
+ registeredLock.synchronized {
+ while (!isRegistered) {
+ registeredLock.wait()
+ }
+ }
+ }
+
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+ /**
+ * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
+ * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
+ * tasks are balanced across the cluster.
+ */
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ val oldClassLoader = setClassLoader()
+ try {
+ synchronized {
+ // Build a big list of the offerable workers, and remember their indices so that we can
+ // figure out which Offer to reply to for each worker
+ val offerableIndices = new ArrayBuffer[Int]
+ val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+ def enoughMemory(o: Offer) = {
+ val mem = getResource(o.getResourcesList, "mem")
+ val slaveId = o.getSlaveId.getValue
+ mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ }
+
+ for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+ offerableIndices += index
+ offerableWorkers += new WorkerOffer(
+ offer.getSlaveId.getValue,
+ offer.getHostname,
+ getResource(offer.getResourcesList, "cpus").toInt)
+ }
+
+ // Call into the ClusterScheduler
+ val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+ // Build a list of Mesos tasks for each slave
+ val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+ for ((taskList, index) <- taskLists.zipWithIndex) {
+ if (!taskList.isEmpty) {
+ val offerNum = offerableIndices(index)
+ val slaveId = offers(offerNum).getSlaveId.getValue
+ slaveIdsWithExecutors += slaveId
+ mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+ for (taskDesc <- taskList) {
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ }
+ }
+ }
+
+ // Reply to the offers
+ val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+ for (i <- 0 until offers.size) {
+ d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ }
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ /** Helper function to pull out a resource from a Mesos Resources protobuf */
+ def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ // If we reached here, no resource with the required name was present
+ throw new IllegalArgumentException("No resource called " + name + " in " + res)
+ }
+
+ /** Turn a Spark TaskDescription into a Mesos task */
+ def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+ val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
+ val cpuResource = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(1).build())
+ .build()
+ return MesosTaskInfo.newBuilder()
+ .setTaskId(taskId)
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+ .setExecutor(createExecutorInfo(slaveId))
+ .setName(task.name)
+ .addResources(cpuResource)
+ .setData(ByteString.copyFrom(task.serializedTask))
+ .build()
+ }
+
+ /** Check whether a Mesos task state represents a finished task */
+ def isFinished(state: MesosTaskState) = {
+ state == MesosTaskState.TASK_FINISHED ||
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ val oldClassLoader = setClassLoader()
+ try {
+ val tid = status.getTaskId.getValue.toLong
+ val state = TaskState.fromMesos(status.getState)
+ synchronized {
+ if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+ // We lost the executor on this slave, so remember that it's gone
+ slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ }
+ if (isFinished(status.getState)) {
+ taskIdToSlaveId.remove(tid)
+ }
+ }
+ scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def error(d: SchedulerDriver, message: String) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def stop() {
+ if (driver != null) {
+ driver.stop()
+ }
+ }
+
+ override def reviveOffers() {
+ driver.reviveOffers()
+ }
+
+ override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+ private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ slaveIdsWithExecutors -= slaveId.getValue
+ }
+ scheduler.executorLost(slaveId.getValue, reason)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ recordSlaveLost(d, slaveId, SlaveLost())
+ }
+
+ override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+ slaveId: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+ slaveId.getValue))
+ recordSlaveLost(d, slaveId, ExecutorExited(status))
+ }
+
+ // TODO: query Mesos for number of cores
+ override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 8cb4d13..e29438f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -31,8 +31,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import akka.actor._
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index e52cb99..a2fda4c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -23,8 +23,8 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
-import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality,
+ TaskResult, TaskSet, TaskSetManager}
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index 3dbe61d..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
- scheduler: ClusterScheduler,
- sc: SparkContext,
- master: String,
- appName: String)
- extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
- with MScheduler
- with Logging {
-
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
-
- // Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[Int, Int]
- var totalCoresAcquired = 0
-
- val slaveIdsWithExecutors = new HashSet[String]
-
- val taskIdToSlaveId = new HashMap[Int, String]
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
- val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
-
- var nextMesosTaskId = 0
-
- def newMesosTaskId(): Int = {
- val id = nextMesosTaskId
- nextMesosTaskId += 1
- id
- }
-
- override def start() {
- super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
- val uri = System.getProperty("spark.executor.uri")
- if (uri == null) {
- val runScript = new File(sparkHome, "spark-class").getCanonicalPath
- command.setValue(
- "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue(
- "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- return command.build()
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
- * unless we've already launched more than we wanted to.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
- for (offer <- offers) {
- val slaveId = offer.getSlaveId.toString
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- val taskId = newMesosTaskId()
- taskIdToSlaveId(taskId) = slaveId
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
- .setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", executorMemory))
- .build()
- d.launchTasks(offer.getId, Collections.singletonList(task), filters)
- } else {
- // Filter it out
- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
- }
- }
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- private def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue.toInt
- val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
- synchronized {
- if (isFinished(state)) {
- val slaveId = taskIdToSlaveId(taskId)
- slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId -= taskId
- // Remove the cores we have remembered for this task, if it's in the hashmap
- for (cores <- coresByTaskId.get(taskId)) {
- totalCoresAcquired -= cores
- coresByTaskId -= taskId
- }
- // If it was a failure, mark the slave as failed for blacklisting purposes
- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
- "is Spark installed on it?")
- }
- }
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
- }
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
- }
-
- override def stop() {
- super.stop()
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
- // Note that the slave ID corresponds to the executor ID on that slave
- slaveIdsWithExecutors -= slaveId.getValue
- removeExecutor(slaveId.getValue, "Mesos slave lost")
- }
- }
- }
-
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index 541f86e..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
- scheduler: ClusterScheduler,
- sc: SparkContext,
- master: String,
- appName: String)
- extends SchedulerBackend
- with MScheduler
- with Logging {
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
- val taskIdToSlaveId = new HashMap[Long, String]
-
- // An ExecutorInfo for our tasks
- var execArgs: Array[Byte] = null
-
- var classLoader: ClassLoader = null
-
- override def start() {
- synchronized {
- classLoader = Thread.currentThread.getContextClassLoader
-
- new Thread("MesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createExecutorInfo(execId: String): ExecutorInfo = {
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val uri = System.getProperty("spark.executor.uri")
- if (uri == null) {
- command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./spark-executor".format(basename))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- val memory = Resource.newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
- .build()
- ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
- .setCommand(command)
- .setData(ByteString.copyFrom(createExecArg()))
- .addResources(memory)
- .build()
- }
-
- /**
- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
- * containing all the spark.* system properties in the form of (String, String) pairs.
- */
- private def createExecArg(): Array[Byte] = {
- if (execArgs == null) {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
- }
- }
- // Serialize the map as an array of (String, String) pairs
- execArgs = Utils.serialize(props.toArray)
- }
- return execArgs
- }
-
- private def setClassLoader(): ClassLoader = {
- val oldClassLoader = Thread.currentThread.getContextClassLoader
- Thread.currentThread.setContextClassLoader(classLoader)
- return oldClassLoader
- }
-
- private def restoreClassLoader(oldClassLoader: ClassLoader) {
- Thread.currentThread.setContextClassLoader(oldClassLoader)
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
- * tasks are balanced across the cluster.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- val oldClassLoader = setClassLoader()
- try {
- synchronized {
- // Build a big list of the offerable workers, and remember their indices so that we can
- // figure out which Offer to reply to for each worker
- val offerableIndices = new ArrayBuffer[Int]
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
- def enoughMemory(o: Offer) = {
- val mem = getResource(o.getResourcesList, "mem")
- val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
- }
-
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices += index
- offerableWorkers += new WorkerOffer(
- offer.getSlaveId.getValue,
- offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
- }
-
- // Call into the ClusterScheduler
- val taskLists = scheduler.resourceOffers(offerableWorkers)
-
- // Build a list of Mesos tasks for each slave
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
- for ((taskList, index) <- taskLists.zipWithIndex) {
- if (!taskList.isEmpty) {
- val offerNum = offerableIndices(index)
- val slaveId = offers(offerNum).getSlaveId.getValue
- slaveIdsWithExecutors += slaveId
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
- for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
- }
- }
- }
-
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
- }
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
- .build()
- return MesosTaskInfo.newBuilder()
- .setTaskId(taskId)
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(createExecutorInfo(slaveId))
- .setName(task.name)
- .addResources(cpuResource)
- .setData(ByteString.copyFrom(task.serializedTask))
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val oldClassLoader = setClassLoader()
- try {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
- }
- if (isFinished(status.getState)) {
- taskIdToSlaveId.remove(tid)
- }
- }
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- val oldClassLoader = setClassLoader()
- try {
- logError("Mesos error: " + message)
- scheduler.error(message)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def stop() {
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def reviveOffers() {
- driver.reviveOffers()
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
- }
- scheduler.executorLost(slaveId.getValue, reason)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- recordSlaveLost(d, slaveId, SlaveLost())
- }
-
- override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
- slaveId: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
- slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
- }
-
- // TODO: query Mesos for number of cores
- override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 3ec9760..453394d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index d1868dc..42e9be6 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
+import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.UIUtils
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 3b428ef..b39c0e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils._
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5d46f38..eb3b4e8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -21,10 +21,8 @@ import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
-import collection.mutable
+import org.apache.spark.scheduler._
/**
* Tracks task-level information to be displayed in the UI.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 6aecef5..e7eab37 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
import collection.mutable
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index b3d3666..06810d8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.xml.Node
-import org.apache.spark.scheduler.Stage
-import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.scheduler.{Schedulable, Stage}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a9969ab..163a374 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
+import org.apache.spark.{ExceptionFailure}
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.UIUtils._
import org.apache.spark.ui.Page._
import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{ExceptionFailure}
-import org.apache.spark.scheduler.cluster.TaskInfo
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
/** Page showing statistics and task list for a given stage */
private[spark] class StagePage(parent: JobProgressUI) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 32776ea..07db862 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -22,8 +22,7 @@ import java.util.Date
import scala.xml.Node
import scala.collection.mutable.HashSet
-import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import org.apache.spark.scheduler.Stage
+import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/976fe60f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 94f66c9..9ed591e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,9 +32,9 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
import org.apache.spark.{FetchFailed, Success, TaskEndReason}
import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.Pool
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler