You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/25 01:35:47 UTC
[16/20] git commit: Renamed ClusterScheduler to TaskSchedulerImpl
Renamed ClusterScheduler to TaskSchedulerImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/30186aa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/30186aa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/30186aa2
Branch: refs/heads/master
Commit: 30186aa2648f90d0ad4e312d28e99c9378ea317a
Parents: c06945c
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 20 14:58:04 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 20 14:58:04 2013 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 20 +-
.../spark/scheduler/ClusterScheduler.scala | 473 -------------------
.../spark/scheduler/TaskResultGetter.scala | 2 +-
.../spark/scheduler/TaskSchedulerImpl.scala | 473 +++++++++++++++++++
.../apache/spark/scheduler/TaskSetManager.scala | 2 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 4 +-
.../cluster/SimrSchedulerBackend.scala | 4 +-
.../cluster/SparkDeploySchedulerBackend.scala | 4 +-
.../mesos/CoarseMesosSchedulerBackend.scala | 4 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 4 +-
.../spark/scheduler/local/LocalBackend.scala | 6 +-
.../SparkContextSchedulerCreationSuite.scala | 6 +-
.../spark/scheduler/ClusterSchedulerSuite.scala | 10 +-
.../spark/scheduler/TaskResultGetterSuite.scala | 6 +-
.../spark/scheduler/TaskSetManagerSuite.scala | 2 +-
15 files changed, 510 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 663b473..ad3337d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1044,25 +1044,25 @@ object SparkContext {
master match {
case "local" =>
- val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+ val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
case LOCAL_N_REGEX(threads) =>
- val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+ val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threads.toInt)
scheduler.initialize(backend)
scheduler
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
- val scheduler = new ClusterScheduler(sc, maxFailures.toInt, isLocal = true)
+ val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalBackend(scheduler, threads.toInt)
scheduler.initialize(backend)
scheduler
case SPARK_REGEX(sparkUrl) =>
- val scheduler = new ClusterScheduler(sc)
+ val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
scheduler.initialize(backend)
@@ -1077,7 +1077,7 @@ object SparkContext {
memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}
- val scheduler = new ClusterScheduler(sc)
+ val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
@@ -1092,7 +1092,7 @@ object SparkContext {
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
- cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+ cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
@@ -1108,7 +1108,7 @@ object SparkContext {
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
- cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+ cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case th: Throwable => {
@@ -1118,7 +1118,7 @@ object SparkContext {
val backend = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
- val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
+ val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
@@ -1131,7 +1131,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
- val scheduler = new ClusterScheduler(sc)
+ val scheduler = new TaskSchedulerImpl(sc)
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
@@ -1143,7 +1143,7 @@ object SparkContext {
scheduler
case SIMR_REGEX(simrUrl) =>
- val scheduler = new ClusterScheduler(sc)
+ val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
scheduler.initialize(backend)
scheduler
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
deleted file mode 100644
index 1ad735b..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{TimerTask, Timer}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.concurrent.duration._
-
-import org.apache.spark._
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-
-/**
- * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
- * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
- * It handles common logic, like determining a scheduling order across jobs, waking up to launch
- * speculative tasks, etc.
- *
- * Clients should first call initialize() and start(), then submit task sets through the
- * runTasks method.
- *
- * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
- * threads, so it needs locks in public API methods to maintain its state. In addition, some
- * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
- * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
- * we are holding a lock on ourselves.
- */
-private[spark] class ClusterScheduler(
- val sc: SparkContext,
- val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
- isLocal: Boolean = false) extends TaskScheduler with Logging {
-
- // How often to check for speculative tasks
- val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
-
- // Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
-
- // TaskSetManagers are not thread safe, so any access to one should be synchronized
- // on this class.
- val activeTaskSets = new HashMap[String, TaskSetManager]
-
- val taskIdToTaskSetId = new HashMap[Long, String]
- val taskIdToExecutorId = new HashMap[Long, String]
- val taskSetTaskIds = new HashMap[String, HashSet[Long]]
-
- @volatile private var hasReceivedTask = false
- @volatile private var hasLaunchedTask = false
- private val starvationTimer = new Timer(true)
-
- // Incrementing task IDs
- val nextTaskId = new AtomicLong(0)
-
- // Which executor IDs we have executors on
- val activeExecutorIds = new HashSet[String]
-
- // The set of executors we have on each host; this is used to compute hostsAlive, which
- // in turn is used to decide when we can attain data locality on a given host
- private val executorsByHost = new HashMap[String, HashSet[String]]
-
- private val executorIdToHost = new HashMap[String, String]
-
- // Listener object to pass upcalls into
- var dagScheduler: DAGScheduler = null
-
- var backend: SchedulerBackend = null
-
- val mapOutputTracker = SparkEnv.get.mapOutputTracker
-
- var schedulableBuilder: SchedulableBuilder = null
- var rootPool: Pool = null
- // default scheduler is FIFO
- val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.scheduler.mode", "FIFO"))
-
- // This is a var so that we can reset it for testing purposes.
- private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
-
- override def setDAGScheduler(dagScheduler: DAGScheduler) {
- this.dagScheduler = dagScheduler
- }
-
- def initialize(backend: SchedulerBackend) {
- this.backend = backend
- // temporarily set rootPool name to empty
- rootPool = new Pool("", schedulingMode, 0, 0)
- schedulableBuilder = {
- schedulingMode match {
- case SchedulingMode.FIFO =>
- new FIFOSchedulableBuilder(rootPool)
- case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
- }
- }
- schedulableBuilder.buildPools()
- }
-
- def newTaskId(): Long = nextTaskId.getAndIncrement()
-
- override def start() {
- backend.start()
-
- if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
- logInfo("Starting speculative execution thread")
- import sc.env.actorSystem.dispatcher
- sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
- SPECULATION_INTERVAL milliseconds) {
- checkSpeculatableTasks()
- }
- }
- }
-
- override def submitTasks(taskSet: TaskSet) {
- val tasks = taskSet.tasks
- logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
- this.synchronized {
- val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
- activeTaskSets(taskSet.id) = manager
- schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
- taskSetTaskIds(taskSet.id) = new HashSet[Long]()
-
- if (!isLocal && !hasReceivedTask) {
- starvationTimer.scheduleAtFixedRate(new TimerTask() {
- override def run() {
- if (!hasLaunchedTask) {
- logWarning("Initial job has not accepted any resources; " +
- "check your cluster UI to ensure that workers are registered " +
- "and have sufficient memory")
- } else {
- this.cancel()
- }
- }
- }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
- }
- hasReceivedTask = true
- }
- backend.reviveOffers()
- }
-
- override def cancelTasks(stageId: Int): Unit = synchronized {
- logInfo("Cancelling stage " + stageId)
- activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
- // There are two possible cases here:
- // 1. The task set manager has been created and some tasks have been scheduled.
- // In this case, send a kill signal to the executors to kill the task and then abort
- // the stage.
- // 2. The task set manager has been created but no tasks has been scheduled. In this case,
- // simply abort the stage.
- val taskIds = taskSetTaskIds(tsm.taskSet.id)
- if (taskIds.size > 0) {
- taskIds.foreach { tid =>
- val execId = taskIdToExecutorId(tid)
- backend.killTask(tid, execId)
- }
- }
- logInfo("Stage %d was cancelled".format(stageId))
- tsm.removeAllRunningTasks()
- taskSetFinished(tsm)
- }
- }
-
- def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
- // Check to see if the given task set has been removed. This is possible in the case of
- // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has
- // more than one running tasks).
- if (activeTaskSets.contains(manager.taskSet.id)) {
- activeTaskSets -= manager.taskSet.id
- manager.parent.removeSchedulable(manager)
- logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
- taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
- taskSetTaskIds.remove(manager.taskSet.id)
- }
- }
-
- /**
- * Called by cluster manager to offer resources on slaves. We respond by asking our active task
- * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
- * that tasks are balanced across the cluster.
- */
- def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
- SparkEnv.set(sc.env)
-
- // Mark each slave as alive and remember its hostname
- for (o <- offers) {
- executorIdToHost(o.executorId) = o.host
- if (!executorsByHost.contains(o.host)) {
- executorsByHost(o.host) = new HashSet[String]()
- executorGained(o.executorId, o.host)
- }
- }
-
- // Build a list of tasks to assign to each worker
- val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- val availableCpus = offers.map(o => o.cores).toArray
- val sortedTaskSets = rootPool.getSortedTaskSetQueue()
- for (taskSet <- sortedTaskSets) {
- logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- taskSet.parent.name, taskSet.name, taskSet.runningTasks))
- }
-
- // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
- // of locality levels so that it gets a chance to launch local tasks on all of them.
- var launchedTask = false
- for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
- do {
- launchedTask = false
- for (i <- 0 until offers.size) {
- val execId = offers(i).executorId
- val host = offers(i).host
- for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetId(tid) = taskSet.taskSet.id
- taskSetTaskIds(taskSet.taskSet.id) += tid
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHost(host) += execId
- availableCpus(i) -= 1
- launchedTask = true
- }
- }
- } while (launchedTask)
- }
-
- if (tasks.size > 0) {
- hasLaunchedTask = true
- }
- return tasks
- }
-
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- var failedExecutor: Option[String] = None
- synchronized {
- try {
- if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
- // We lost this entire executor, so remember that it's gone
- val execId = taskIdToExecutorId(tid)
- if (activeExecutorIds.contains(execId)) {
- removeExecutor(execId)
- failedExecutor = Some(execId)
- }
- }
- taskIdToTaskSetId.get(tid) match {
- case Some(taskSetId) =>
- if (TaskState.isFinished(state)) {
- taskIdToTaskSetId.remove(tid)
- if (taskSetTaskIds.contains(taskSetId)) {
- taskSetTaskIds(taskSetId) -= tid
- }
- taskIdToExecutorId.remove(tid)
- }
- activeTaskSets.get(taskSetId).foreach { taskSet =>
- if (state == TaskState.FINISHED) {
- taskSet.removeRunningTask(tid)
- taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
- } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
- taskSet.removeRunningTask(tid)
- taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
- }
- }
- case None =>
- logInfo("Ignoring update from TID " + tid + " because its task set is gone")
- }
- } catch {
- case e: Exception => logError("Exception in statusUpdate", e)
- }
- }
- // Update the DAGScheduler without holding a lock on this, since that can deadlock
- if (failedExecutor != None) {
- dagScheduler.executorLost(failedExecutor.get)
- backend.reviveOffers()
- }
- }
-
- def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
- taskSetManager.handleTaskGettingResult(tid)
- }
-
- def handleSuccessfulTask(
- taskSetManager: TaskSetManager,
- tid: Long,
- taskResult: DirectTaskResult[_]) = synchronized {
- taskSetManager.handleSuccessfulTask(tid, taskResult)
- }
-
- def handleFailedTask(
- taskSetManager: TaskSetManager,
- tid: Long,
- taskState: TaskState,
- reason: Option[TaskEndReason]) = synchronized {
- taskSetManager.handleFailedTask(tid, taskState, reason)
- if (taskState != TaskState.KILLED) {
- // Need to revive offers again now that the task set manager state has been updated to
- // reflect failed tasks that need to be re-run.
- backend.reviveOffers()
- }
- }
-
- def error(message: String) {
- synchronized {
- if (activeTaskSets.size > 0) {
- // Have each task set throw a SparkException with the error
- for ((taskSetId, manager) <- activeTaskSets) {
- try {
- manager.error(message)
- } catch {
- case e: Exception => logError("Exception in error callback", e)
- }
- }
- } else {
- // No task sets are active but we still got an error. Just exit since this
- // must mean the error is during registration.
- // It might be good to do something smarter here in the future.
- logError("Exiting due to error from cluster scheduler: " + message)
- System.exit(1)
- }
- }
- }
-
- override def stop() {
- if (backend != null) {
- backend.stop()
- }
- if (taskResultGetter != null) {
- taskResultGetter.stop()
- }
-
- // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
- // TODO: Do something better !
- Thread.sleep(5000L)
- }
-
- override def defaultParallelism() = backend.defaultParallelism()
-
- // Check for speculatable tasks in all our active jobs.
- def checkSpeculatableTasks() {
- var shouldRevive = false
- synchronized {
- shouldRevive = rootPool.checkSpeculatableTasks()
- }
- if (shouldRevive) {
- backend.reviveOffers()
- }
- }
-
- // Check for pending tasks in all our active jobs.
- def hasPendingTasks: Boolean = {
- synchronized {
- rootPool.hasPendingTasks()
- }
- }
-
- def executorLost(executorId: String, reason: ExecutorLossReason) {
- var failedExecutor: Option[String] = None
-
- synchronized {
- if (activeExecutorIds.contains(executorId)) {
- val hostPort = executorIdToHost(executorId)
- logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
- removeExecutor(executorId)
- failedExecutor = Some(executorId)
- } else {
- // We may get multiple executorLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor " + executorId + " (already removed): " + reason)
- }
- }
- // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
- if (failedExecutor != None) {
- dagScheduler.executorLost(failedExecutor.get)
- backend.reviveOffers()
- }
- }
-
- /** Remove an executor from all our data structures and mark it as lost */
- private def removeExecutor(executorId: String) {
- activeExecutorIds -= executorId
- val host = executorIdToHost(executorId)
- val execs = executorsByHost.getOrElse(host, new HashSet)
- execs -= executorId
- if (execs.isEmpty) {
- executorsByHost -= host
- }
- executorIdToHost -= executorId
- rootPool.executorLost(executorId, host)
- }
-
- def executorGained(execId: String, host: String) {
- dagScheduler.executorGained(execId, host)
- }
-
- def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
- executorsByHost.get(host).map(_.toSet)
- }
-
- def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
- executorsByHost.contains(host)
- }
-
- def isExecutorAlive(execId: String): Boolean = synchronized {
- activeExecutorIds.contains(execId)
- }
-
- // By default, rack is unknown
- def getRackForHost(value: String): Option[String] = None
-}
-
-
-private[spark] object ClusterScheduler {
- /**
- * Used to balance containers across hosts.
- *
- * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
- * resource offers representing the order in which the offers should be used. The resource
- * offers are ordered such that we'll allocate one container on each host before allocating a
- * second container on any host, and so on, in order to reduce the damage if a host fails.
- *
- * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
- * [o1, o5, o4, 02, o6, o3]
- */
- def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
- val _keyList = new ArrayBuffer[K](map.size)
- _keyList ++= map.keys
-
- // order keyList based on population of value in map
- val keyList = _keyList.sortWith(
- (left, right) => map(left).size > map(right).size
- )
-
- val retval = new ArrayBuffer[T](keyList.size * 2)
- var index = 0
- var found = true
-
- while (found) {
- found = false
- for (key <- keyList) {
- val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
- assert(containerList != null)
- // Get the index'th entry for this host - if present
- if (index < containerList.size){
- retval += containerList.apply(index)
- found = true
- }
- }
- index += 1
- }
-
- retval.toList
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 7b5543e..8910272 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
-private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends Logging {
private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
new file mode 100644
index 0000000..7409168
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.concurrent.duration._
+
+import org.apache.spark._
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+/**
+ * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
+ * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ *
+ * Clients should first call initialize() and start(), then submit task sets through the
+ * runTasks method.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
+ */
+private[spark] class TaskSchedulerImpl(
+ val sc: SparkContext,
+ val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+ isLocal: Boolean = false) extends TaskScheduler with Logging {
+
+ // How often to check for speculative tasks
+ val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+
+ // TaskSetManagers are not thread safe, so any access to one should be synchronized
+ // on this class.
+ val activeTaskSets = new HashMap[String, TaskSetManager]
+
+ val taskIdToTaskSetId = new HashMap[Long, String]
+ val taskIdToExecutorId = new HashMap[Long, String]
+ val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+
+ @volatile private var hasReceivedTask = false
+ @volatile private var hasLaunchedTask = false
+ private val starvationTimer = new Timer(true)
+
+ // Incrementing task IDs
+ val nextTaskId = new AtomicLong(0)
+
+ // Which executor IDs we have executors on
+ val activeExecutorIds = new HashSet[String]
+
+ // The set of executors we have on each host; this is used to compute hostsAlive, which
+ // in turn is used to decide when we can attain data locality on a given host
+ private val executorsByHost = new HashMap[String, HashSet[String]]
+
+ private val executorIdToHost = new HashMap[String, String]
+
+ // Listener object to pass upcalls into
+ var dagScheduler: DAGScheduler = null
+
+ var backend: SchedulerBackend = null
+
+ val mapOutputTracker = SparkEnv.get.mapOutputTracker
+
+ var schedulableBuilder: SchedulableBuilder = null
+ var rootPool: Pool = null
+ // default scheduler is FIFO
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.scheduler.mode", "FIFO"))
+
+ // This is a var so that we can reset it for testing purposes.
+ private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
+
+ override def setDAGScheduler(dagScheduler: DAGScheduler) {
+ this.dagScheduler = dagScheduler
+ }
+
+ def initialize(backend: SchedulerBackend) {
+ this.backend = backend
+ // temporarily set rootPool name to empty
+ rootPool = new Pool("", schedulingMode, 0, 0)
+ schedulableBuilder = {
+ schedulingMode match {
+ case SchedulingMode.FIFO =>
+ new FIFOSchedulableBuilder(rootPool)
+ case SchedulingMode.FAIR =>
+ new FairSchedulableBuilder(rootPool)
+ }
+ }
+ schedulableBuilder.buildPools()
+ }
+
+ def newTaskId(): Long = nextTaskId.getAndIncrement()
+
+ override def start() {
+ backend.start()
+
+ if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
+ logInfo("Starting speculative execution thread")
+ import sc.env.actorSystem.dispatcher
+ sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+ SPECULATION_INTERVAL milliseconds) {
+ checkSpeculatableTasks()
+ }
+ }
+ }
+
+ override def submitTasks(taskSet: TaskSet) {
+ val tasks = taskSet.tasks
+ logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
+ this.synchronized {
+ val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
+ activeTaskSets(taskSet.id) = manager
+ schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
+ taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (!isLocal && !hasReceivedTask) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial job has not accepted any resources; " +
+ "check your cluster UI to ensure that workers are registered " +
+ "and have sufficient memory")
+ } else {
+ this.cancel()
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true
+ }
+ backend.reviveOffers()
+ }
+
+ override def cancelTasks(stageId: Int): Unit = synchronized {
+ logInfo("Cancelling stage " + stageId)
+ activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
+ // There are two possible cases here:
+ // 1. The task set manager has been created and some tasks have been scheduled.
+ // In this case, send a kill signal to the executors to kill the task and then abort
+ // the stage.
+ // 2. The task set manager has been created but no tasks has been scheduled. In this case,
+ // simply abort the stage.
+ val taskIds = taskSetTaskIds(tsm.taskSet.id)
+ if (taskIds.size > 0) {
+ taskIds.foreach { tid =>
+ val execId = taskIdToExecutorId(tid)
+ backend.killTask(tid, execId)
+ }
+ }
+ logInfo("Stage %d was cancelled".format(stageId))
+ tsm.removeAllRunningTasks()
+ taskSetFinished(tsm)
+ }
+ }
+
+ def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
+ // Check to see if the given task set has been removed. This is possible in the case of
+ // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has
+ // more than one running tasks).
+ if (activeTaskSets.contains(manager.taskSet.id)) {
+ activeTaskSets -= manager.taskSet.id
+ manager.parent.removeSchedulable(manager)
+ logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+ taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+ taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
+ taskSetTaskIds.remove(manager.taskSet.id)
+ }
+ }
+
+ /**
+ * Called by cluster manager to offer resources on slaves. We respond by asking our active task
+ * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
+ * that tasks are balanced across the cluster.
+ */
+ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+ SparkEnv.set(sc.env)
+
+ // Mark each slave as alive and remember its hostname
+ for (o <- offers) {
+ executorIdToHost(o.executorId) = o.host
+ if (!executorsByHost.contains(o.host)) {
+ executorsByHost(o.host) = new HashSet[String]()
+ executorGained(o.executorId, o.host)
+ }
+ }
+
+ // Build a list of tasks to assign to each worker
+ val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ val availableCpus = offers.map(o => o.cores).toArray
+ val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+ for (taskSet <- sortedTaskSets) {
+ logDebug("parentName: %s, name: %s, runningTasks: %s".format(
+ taskSet.parent.name, taskSet.name, taskSet.runningTasks))
+ }
+
+ // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+ // of locality levels so that it gets a chance to launch local tasks on all of them.
+ var launchedTask = false
+ for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
+ do {
+ launchedTask = false
+ for (i <- 0 until offers.size) {
+ val execId = offers(i).executorId
+ val host = offers(i).host
+ for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = taskSet.taskSet.id
+ taskSetTaskIds(taskSet.taskSet.id) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= 1
+ launchedTask = true
+ }
+ }
+ } while (launchedTask)
+ }
+
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
+ }
+ return tasks
+ }
+
+ def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ var failedExecutor: Option[String] = None
+ synchronized {
+ try {
+ if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+ // We lost this entire executor, so remember that it's gone
+ val execId = taskIdToExecutorId(tid)
+ if (activeExecutorIds.contains(execId)) {
+ removeExecutor(execId)
+ failedExecutor = Some(execId)
+ }
+ }
+ taskIdToTaskSetId.get(tid) match {
+ case Some(taskSetId) =>
+ if (TaskState.isFinished(state)) {
+ taskIdToTaskSetId.remove(tid)
+ if (taskSetTaskIds.contains(taskSetId)) {
+ taskSetTaskIds(taskSetId) -= tid
+ }
+ taskIdToExecutorId.remove(tid)
+ }
+ activeTaskSets.get(taskSetId).foreach { taskSet =>
+ if (state == TaskState.FINISHED) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+ } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
+ }
+ }
+ case None =>
+ logInfo("Ignoring update from TID " + tid + " because its task set is gone")
+ }
+ } catch {
+ case e: Exception => logError("Exception in statusUpdate", e)
+ }
+ }
+ // Update the DAGScheduler without holding a lock on this, since that can deadlock
+ if (failedExecutor != None) {
+ dagScheduler.executorLost(failedExecutor.get)
+ backend.reviveOffers()
+ }
+ }
+
+ def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+ taskSetManager.handleTaskGettingResult(tid)
+ }
+
+ def handleSuccessfulTask(
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskResult: DirectTaskResult[_]) = synchronized {
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
+ }
+
+ def handleFailedTask(
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskState: TaskState,
+ reason: Option[TaskEndReason]) = synchronized {
+ taskSetManager.handleFailedTask(tid, taskState, reason)
+ if (taskState != TaskState.KILLED) {
+ // Need to revive offers again now that the task set manager state has been updated to
+ // reflect failed tasks that need to be re-run.
+ backend.reviveOffers()
+ }
+ }
+
+ def error(message: String) {
+ synchronized {
+ if (activeTaskSets.size > 0) {
+ // Have each task set throw a SparkException with the error
+ for ((taskSetId, manager) <- activeTaskSets) {
+ try {
+ manager.error(message)
+ } catch {
+ case e: Exception => logError("Exception in error callback", e)
+ }
+ }
+ } else {
+ // No task sets are active but we still got an error. Just exit since this
+ // must mean the error is during registration.
+ // It might be good to do something smarter here in the future.
+ logError("Exiting due to error from cluster scheduler: " + message)
+ System.exit(1)
+ }
+ }
+ }
+
+ override def stop() {
+ if (backend != null) {
+ backend.stop()
+ }
+ if (taskResultGetter != null) {
+ taskResultGetter.stop()
+ }
+
+ // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
+ // TODO: Do something better !
+ Thread.sleep(5000L)
+ }
+
+ override def defaultParallelism() = backend.defaultParallelism()
+
+ // Check for speculatable tasks in all our active jobs.
+ def checkSpeculatableTasks() {
+ var shouldRevive = false
+ synchronized {
+ shouldRevive = rootPool.checkSpeculatableTasks()
+ }
+ if (shouldRevive) {
+ backend.reviveOffers()
+ }
+ }
+
+ // Check for pending tasks in all our active jobs.
+ def hasPendingTasks: Boolean = {
+ synchronized {
+ rootPool.hasPendingTasks()
+ }
+ }
+
+ def executorLost(executorId: String, reason: ExecutorLossReason) {
+ var failedExecutor: Option[String] = None
+
+ synchronized {
+ if (activeExecutorIds.contains(executorId)) {
+ val hostPort = executorIdToHost(executorId)
+ logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
+ removeExecutor(executorId)
+ failedExecutor = Some(executorId)
+ } else {
+ // We may get multiple executorLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
+ }
+ }
+ // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
+ if (failedExecutor != None) {
+ dagScheduler.executorLost(failedExecutor.get)
+ backend.reviveOffers()
+ }
+ }
+
+ /** Remove an executor from all our data structures and mark it as lost */
+ private def removeExecutor(executorId: String) {
+ activeExecutorIds -= executorId
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
+ execs -= executorId
+ if (execs.isEmpty) {
+ executorsByHost -= host
+ }
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host)
+ }
+
+ def executorGained(execId: String, host: String) {
+ dagScheduler.executorGained(execId, host)
+ }
+
+ def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
+ executorsByHost.get(host).map(_.toSet)
+ }
+
+ def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
+ executorsByHost.contains(host)
+ }
+
+ def isExecutorAlive(execId: String): Boolean = synchronized {
+ activeExecutorIds.contains(execId)
+ }
+
+ // By default, rack is unknown
+ def getRackForHost(value: String): Option[String] = None
+}
+
+
+private[spark] object TaskSchedulerImpl {
+ /**
+ * Used to balance containers across hosts.
+ *
+ * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+ * resource offers representing the order in which the offers should be used. The resource
+ * offers are ordered such that we'll allocate one container on each host before allocating a
+ * second container on any host, and so on, in order to reduce the damage if a host fails.
+ *
+ * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+ * [o1, o5, o4, 02, o6, o3]
+ */
+ def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
+ val _keyList = new ArrayBuffer[K](map.size)
+ _keyList ++= map.keys
+
+ // order keyList based on population of value in map
+ val keyList = _keyList.sortWith(
+ (left, right) => map(left).size > map(right).size
+ )
+
+ val retval = new ArrayBuffer[T](keyList.size * 2)
+ var index = 0
+ var found = true
+
+ while (found) {
+ found = false
+ for (key <- keyList) {
+ val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
+ assert(containerList != null)
+ // Get the index'th entry for this host - if present
+ if (index < containerList.size){
+ retval += containerList.apply(index)
+ found = true
+ }
+ }
+ index += 1
+ }
+
+ retval.toList
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 0fe413a..0ac9829 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -48,7 +48,7 @@ import java.io.NotSerializableException
* task set will be aborted
*/
private[spark] class TaskSetManager(
- sched: ClusterScheduler,
+ sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = SystemClock)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5797783..5c534a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -29,7 +29,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.{Logging, SparkException, TaskState}
-import org.apache.spark.scheduler.{ClusterScheduler, SchedulerBackend, SlaveLost, TaskDescription,
+import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -43,7 +43,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* (spark.deploy.*).
*/
private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 2fbd725..ec3e68e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.scheduler.ClusterScheduler
+import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class SimrSchedulerBackend(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
sc: SparkContext,
driverFilePath: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 1d38f0d..404ce7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -22,11 +22,11 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.client.{Client, ClientListener}
import org.apache.spark.deploy.{Command, ApplicationDescription}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, ClusterScheduler}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String],
appName: String)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5481828..39573fc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.ClusterScheduler
+import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
/**
@@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
* remove this.
*/
private[spark] class CoarseMesosSchedulerBackend(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String,
appName: String)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 773b980..6aa788c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
- TaskDescription, ClusterScheduler, WorkerOffer}
+ TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.Utils
/**
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
* from multiple apps can run on different cores) and in time (a core can switch ownership).
*/
private[spark] class MesosSchedulerBackend(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String,
appName: String)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 6b5f1a5..69c1c04 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
private case class ReviveOffers()
@@ -38,7 +38,7 @@ private case class KillTask(taskId: Long)
* and the ClusterScheduler.
*/
private[spark] class LocalActor(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int) extends Actor with Logging {
@@ -78,7 +78,7 @@ private[spark] class LocalActor(
* master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
* on a single Executor (created by the LocalBackend) running locally.
*/
-private[spark] class LocalBackend(scheduler: ClusterScheduler, val totalCores: Int)
+private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
extends SchedulerBackend with ExecutorBackend {
var localActor: ActorRef = null
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index d4a7a11..9deed56 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import org.scalatest.{FunSuite, PrivateMethodTester}
-import org.apache.spark.scheduler.{ClusterScheduler, TaskScheduler}
+import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
@@ -27,13 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
- def createTaskScheduler(master: String): ClusterScheduler = {
+ def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test")
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
- sched.asInstanceOf[ClusterScheduler]
+ sched.asInstanceOf[TaskSchedulerImpl]
}
test("bad-master") {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 35a06c4..702edb8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -29,7 +29,7 @@ class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
- clusterScheduler: ClusterScheduler,
+ clusterScheduler: TaskSchedulerImpl,
taskSet: TaskSet)
extends TaskSetManager(clusterScheduler, taskSet, 0) {
@@ -104,7 +104,7 @@ class FakeTaskSetManager(
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
@@ -131,7 +131,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("FIFO Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -158,7 +158,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("Fair Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -215,7 +215,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("Nested Pool Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9784920..2265619 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.TaskResultBlockId
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
@@ -92,8 +92,8 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
sc = new SparkContext("local[1,1]", "test")
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
- val scheduler: ClusterScheduler = sc.taskScheduler match {
- case clusterScheduler: ClusterScheduler =>
+ val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
+ case clusterScheduler: TaskSchedulerImpl =>
clusterScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30186aa2/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b34b6f3..771a64f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -58,7 +58,7 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
* to work, and these are required for locality in TaskSetManager.
*/
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
- extends ClusterScheduler(sc)
+ extends TaskSchedulerImpl(sc)
{
val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason]