You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/25 01:35:37 UTC
[06/20] git commit: Merge remote-tracking branch 'upstream/master'
into consolidate_schedulers
Merge remote-tracking branch 'upstream/master' into consolidate_schedulers
Conflicts:
core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/150615a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/150615a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/150615a3
Branch: refs/heads/master
Commit: 150615a31e0c2a5112c37cca62dd80dba8a12fab
Parents: 68e5ad5 39af914
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Nov 13 14:38:44 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Nov 13 14:38:44 2013 -0800
----------------------------------------------------------------------
README.md | 2 +-
.../scala/org/apache/spark/SparkContext.scala | 32 +-
.../apache/spark/deploy/SparkHadoopUtil.scala | 31 +-
.../spark/deploy/worker/ExecutorRunner.scala | 2 +-
.../executor/CoarseGrainedExecutorBackend.scala | 2 +-
.../org/apache/spark/executor/Executor.scala | 7 +-
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +
.../spark/scheduler/ClusterScheduler.scala | 23 +-
.../apache/spark/scheduler/DAGScheduler.scala | 13 +-
.../org/apache/spark/scheduler/JobLogger.scala | 664 +++++++++++--------
.../apache/spark/scheduler/ShuffleMapTask.scala | 23 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 14 +-
.../cluster/SimrSchedulerBackend.scala | 12 +
.../spark/serializer/KryoSerializer.scala | 52 +-
.../org/apache/spark/storage/BlockInfo.scala | 18 +-
.../org/apache/spark/storage/BlockManager.scala | 20 +-
.../spark/storage/BlockObjectWriter.scala | 27 +-
.../apache/spark/storage/DiskBlockManager.scala | 49 +-
.../org/apache/spark/storage/DiskStore.scala | 4 +-
.../spark/storage/ShuffleBlockManager.scala | 189 +++++-
.../spark/storage/StoragePerfTester.scala | 10 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 31 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 11 +-
.../org/apache/spark/util/MetadataCleaner.scala | 2 +-
.../scala/org/apache/spark/util/Utils.scala | 17 +-
.../apache/spark/util/collection/BitSet.scala | 103 +++
.../spark/util/collection/OpenHashMap.scala | 152 +++++
.../spark/util/collection/OpenHashSet.scala | 271 ++++++++
.../collection/PrimitiveKeyOpenHashMap.scala | 127 ++++
.../spark/util/collection/PrimitiveVector.scala | 51 ++
.../deploy/worker/ExecutorRunnerTest.scala | 19 +
.../apache/spark/scheduler/JobLoggerSuite.scala | 6 +-
.../spark/scheduler/SparkListenerSuite.scala | 2 +-
.../spark/storage/DiskBlockManagerSuite.scala | 84 +++
.../spark/util/collection/BitSetSuite.scala | 73 ++
.../util/collection/OpenHashMapSuite.scala | 148 +++++
.../util/collection/OpenHashSetSuite.scala | 145 ++++
.../PrimitiveKeyOpenHashSetSuite.scala | 90 +++
docs/cluster-overview.md | 14 +-
docs/ec2-scripts.md | 2 +-
docs/running-on-yarn.md | 1 +
ec2/spark_ec2.py | 68 +-
pom.xml | 6 +
project/SparkBuild.scala | 5 +-
.../org/apache/spark/repl/SparkIMain.scala | 11 +-
.../scala/org/apache/spark/repl/ReplSuite.scala | 35 +-
spark-class | 13 +
spark-class2.cmd | 7 +
.../streaming/dstream/NetworkInputDStream.scala | 4 +-
.../spark/streaming/InputStreamsSuite.scala | 83 ++-
yarn/pom.xml | 50 ++
.../spark/deploy/yarn/ApplicationMaster.scala | 2 +-
.../org/apache/spark/deploy/yarn/Client.scala | 276 ++++----
.../yarn/ClientDistributedCacheManager.scala | 228 +++++++
.../spark/deploy/yarn/WorkerRunnable.scala | 42 +-
.../ClientDistributedCacheManagerSuite.scala | 220 ++++++
56 files changed, 2873 insertions(+), 722 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/150615a3/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/150615a3/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
index c7d1295,0000000..c5d7ca0
mode 100644,000000..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
@@@ -1,493 -1,0 +1,486 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
++import akka.util.duration._
++
+import org.apache.spark._
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+/**
+ * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
+ * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ *
+ * Clients should first call initialize() and start(), then submit task sets through the
+ * runTasks method.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
+ */
+private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = false)
+ extends TaskScheduler with Logging {
+
+ // How often to check for speculative tasks
+ val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+
+ // TaskSetManagers are not thread safe, so any access to one should be synchronized
+ // on this class.
+ val activeTaskSets = new HashMap[String, TaskSetManager]
+
+ val MAX_TASK_FAILURES = {
+ if (isLocal) {
+ // No sense in retrying if all tasks run locally!
+ 0
+ } else {
+ System.getProperty("spark.task.maxFailures", "4").toInt
+ }
+ }
+
+ val taskIdToTaskSetId = new HashMap[Long, String]
+ val taskIdToExecutorId = new HashMap[Long, String]
+ val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+
+ @volatile private var hasReceivedTask = false
+ @volatile private var hasLaunchedTask = false
+ private val starvationTimer = new Timer(true)
+
+ // Incrementing task IDs
+ val nextTaskId = new AtomicLong(0)
+
+ // Which executor IDs we have executors on
+ val activeExecutorIds = new HashSet[String]
+
+ // The set of executors we have on each host; this is used to compute hostsAlive, which
+ // in turn is used to decide when we can attain data locality on a given host
+ private val executorsByHost = new HashMap[String, HashSet[String]]
+
+ private val executorIdToHost = new HashMap[String, String]
+
+ // Listener object to pass upcalls into
+ var dagScheduler: DAGScheduler = null
+
+ var backend: SchedulerBackend = null
+
+ val mapOutputTracker = SparkEnv.get.mapOutputTracker
+
+ var schedulableBuilder: SchedulableBuilder = null
+ var rootPool: Pool = null
+ // default scheduler is FIFO
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.scheduler.mode", "FIFO"))
+
+ // This is a var so that we can reset it for testing purposes.
+ private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
+
+ override def setDAGScheduler(dagScheduler: DAGScheduler) {
+ this.dagScheduler = dagScheduler
+ }
+
+ def initialize(context: SchedulerBackend) {
+ backend = context
+ // temporarily set rootPool name to empty
+ rootPool = new Pool("", schedulingMode, 0, 0)
+ schedulableBuilder = {
+ schedulingMode match {
+ case SchedulingMode.FIFO =>
+ new FIFOSchedulableBuilder(rootPool)
+ case SchedulingMode.FAIR =>
+ new FairSchedulableBuilder(rootPool)
+ }
+ }
+ schedulableBuilder.buildPools()
+ }
+
+ def newTaskId(): Long = nextTaskId.getAndIncrement()
+
+ override def start() {
+ backend.start()
+
+ if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
- new Thread("TaskScheduler speculation check") {
- setDaemon(true)
-
- override def run() {
- logInfo("Starting speculative execution thread")
- while (true) {
- try {
- Thread.sleep(SPECULATION_INTERVAL)
- } catch {
- case e: InterruptedException => {}
- }
- checkSpeculatableTasks()
- }
- }
- }.start()
++ logInfo("Starting speculative execution thread")
++
++ sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
++ SPECULATION_INTERVAL milliseconds) {
++ checkSpeculatableTasks()
++ }
+ }
+ }
+
+ override def submitTasks(taskSet: TaskSet) {
+ val tasks = taskSet.tasks
+ logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
+ this.synchronized {
+ val manager = new TaskSetManager(this, taskSet, MAX_TASK_FAILURES)
+ activeTaskSets(taskSet.id) = manager
+ schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
+ taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (!isLocal && !hasReceivedTask) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial job has not accepted any resources; " +
+ "check your cluster UI to ensure that workers are registered " +
+ "and have sufficient memory")
+ } else {
+ this.cancel()
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true
+ }
+ backend.reviveOffers()
+ }
+
+ override def cancelTasks(stageId: Int): Unit = synchronized {
+ logInfo("Cancelling stage " + stageId)
+ activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
+ // There are two possible cases here:
+ // 1. The task set manager has been created and some tasks have been scheduled.
+ // In this case, send a kill signal to the executors to kill the task and then abort
+ // the stage.
+ // 2. The task set manager has been created but no tasks has been scheduled. In this case,
+ // simply abort the stage.
+ val taskIds = taskSetTaskIds(tsm.taskSet.id)
+ if (taskIds.size > 0) {
+ taskIds.foreach { tid =>
+ val execId = taskIdToExecutorId(tid)
+ backend.killTask(tid, execId)
+ }
+ }
+ tsm.error("Stage %d was cancelled".format(stageId))
+ }
+ }
+
+ def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
+ // Check to see if the given task set has been removed. This is possible in the case of
+ // multiple unrecoverable task failures (e.g. if the entire task set is killed when it has
+ // more than one running tasks).
+ if (activeTaskSets.contains(manager.taskSet.id)) {
+ activeTaskSets -= manager.taskSet.id
+ manager.parent.removeSchedulable(manager)
+ logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+ taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+ taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
+ taskSetTaskIds.remove(manager.taskSet.id)
+ }
+ }
+
+ /**
+ * Called by cluster manager to offer resources on slaves. We respond by asking our active task
+ * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
+ * that tasks are balanced across the cluster.
+ */
+ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+ SparkEnv.set(sc.env)
+
+ // Mark each slave as alive and remember its hostname
+ for (o <- offers) {
+ executorIdToHost(o.executorId) = o.host
+ if (!executorsByHost.contains(o.host)) {
+ executorsByHost(o.host) = new HashSet[String]()
+ executorGained(o.executorId, o.host)
+ }
+ }
+
+ // Build a list of tasks to assign to each worker
+ val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ val availableCpus = offers.map(o => o.cores).toArray
+ val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+ for (taskSet <- sortedTaskSets) {
+ logDebug("parentName: %s, name: %s, runningTasks: %s".format(
+ taskSet.parent.name, taskSet.name, taskSet.runningTasks))
+ }
+
+ // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+ // of locality levels so that it gets a chance to launch local tasks on all of them.
+ var launchedTask = false
+ for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
+ do {
+ launchedTask = false
+ for (i <- 0 until offers.size) {
+ val execId = offers(i).executorId
+ val host = offers(i).host
+ for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = taskSet.taskSet.id
+ taskSetTaskIds(taskSet.taskSet.id) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= 1
+ launchedTask = true
+ }
+ }
+ } while (launchedTask)
+ }
+
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
+ }
+ return tasks
+ }
+
+ def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ var failedExecutor: Option[String] = None
+ var taskFailed = false
+ synchronized {
+ try {
+ if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+ // We lost this entire executor, so remember that it's gone
+ val execId = taskIdToExecutorId(tid)
+ if (activeExecutorIds.contains(execId)) {
+ removeExecutor(execId)
+ failedExecutor = Some(execId)
+ }
+ }
+ taskIdToTaskSetId.get(tid) match {
+ case Some(taskSetId) =>
+ if (TaskState.isFinished(state)) {
+ taskIdToTaskSetId.remove(tid)
+ if (taskSetTaskIds.contains(taskSetId)) {
+ taskSetTaskIds(taskSetId) -= tid
+ }
+ taskIdToExecutorId.remove(tid)
+ }
+ if (state == TaskState.FAILED) {
+ taskFailed = true
+ }
+ activeTaskSets.get(taskSetId).foreach { taskSet =>
+ if (state == TaskState.FINISHED) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+ } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
+ }
+ }
+ case None =>
+ logInfo("Ignoring update from TID " + tid + " because its task set is gone")
+ }
+ } catch {
+ case e: Exception => logError("Exception in statusUpdate", e)
+ }
+ }
+ // Update the DAGScheduler without holding a lock on this, since that can deadlock
+ if (failedExecutor != None) {
+ dagScheduler.executorLost(failedExecutor.get)
+ backend.reviveOffers()
+ }
+ if (taskFailed) {
+ // Also revive offers if a task had failed for some reason other than host lost
+ backend.reviveOffers()
+ }
+ }
+
+ def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+ taskSetManager.handleTaskGettingResult(tid)
+ }
+
+ def handleSuccessfulTask(
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskResult: DirectTaskResult[_]) = synchronized {
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
+ }
+
+ def handleFailedTask(
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskState: TaskState,
+ reason: Option[TaskEndReason]) = synchronized {
+ taskSetManager.handleFailedTask(tid, taskState, reason)
+ if (taskState == TaskState.FINISHED) {
+ // The task finished successfully but the result was lost, so we should revive offers.
+ backend.reviveOffers()
+ }
+ }
+
+ def error(message: String) {
+ synchronized {
+ if (activeTaskSets.size > 0) {
+ // Have each task set throw a SparkException with the error
+ for ((taskSetId, manager) <- activeTaskSets) {
+ try {
+ manager.error(message)
+ } catch {
+ case e: Exception => logError("Exception in error callback", e)
+ }
+ }
+ } else {
+ // No task sets are active but we still got an error. Just exit since this
+ // must mean the error is during registration.
+ // It might be good to do something smarter here in the future.
+ logError("Exiting due to error from task scheduler: " + message)
+ System.exit(1)
+ }
+ }
+ }
+
+ override def stop() {
+ if (backend != null) {
+ backend.stop()
+ }
+ if (taskResultGetter != null) {
+ taskResultGetter.stop()
+ }
+
+ // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
+ // TODO: Do something better !
+ Thread.sleep(5000L)
+ }
+
+ override def defaultParallelism() = backend.defaultParallelism()
+
+ // Check for speculatable tasks in all our active jobs.
+ def checkSpeculatableTasks() {
+ var shouldRevive = false
+ synchronized {
+ shouldRevive = rootPool.checkSpeculatableTasks()
+ }
+ if (shouldRevive) {
+ backend.reviveOffers()
+ }
+ }
+
+ // Check for pending tasks in all our active jobs.
+ def hasPendingTasks: Boolean = {
+ synchronized {
+ rootPool.hasPendingTasks()
+ }
+ }
+
+ def executorLost(executorId: String, reason: ExecutorLossReason) {
+ var failedExecutor: Option[String] = None
+
+ synchronized {
+ if (activeExecutorIds.contains(executorId)) {
+ val hostPort = executorIdToHost(executorId)
+ logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
+ removeExecutor(executorId)
+ failedExecutor = Some(executorId)
+ } else {
+ // We may get multiple executorLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
+ }
+ }
+ // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
+ if (failedExecutor != None) {
+ dagScheduler.executorLost(failedExecutor.get)
+ backend.reviveOffers()
+ }
+ }
+
+ /** Remove an executor from all our data structures and mark it as lost */
+ private def removeExecutor(executorId: String) {
+ activeExecutorIds -= executorId
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
+ execs -= executorId
+ if (execs.isEmpty) {
+ executorsByHost -= host
+ }
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host)
+ }
+
+ def executorGained(execId: String, host: String) {
+ dagScheduler.executorGained(execId, host)
+ }
+
+ def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
+ executorsByHost.get(host).map(_.toSet)
+ }
+
+ def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
+ executorsByHost.contains(host)
+ }
+
+ def isExecutorAlive(execId: String): Boolean = synchronized {
+ activeExecutorIds.contains(execId)
+ }
+
+ // By default, rack is unknown
+ def getRackForHost(value: String): Option[String] = None
+}
+
+
+private[spark] object ClusterScheduler {
+ /**
+ * Used to balance containers across hosts.
+ *
+ * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+ * resource offers representing the order in which the offers should be used. The resource
+ * offers are ordered such that we'll allocate one container on each host before allocating a
+ * second container on any host, and so on, in order to reduce the damage if a host fails.
+ *
+ * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+ * [o1, o5, o4, 02, o6, o3]
+ */
+ def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
+ val _keyList = new ArrayBuffer[K](map.size)
+ _keyList ++= map.keys
+
+ // order keyList based on population of value in map
+ val keyList = _keyList.sortWith(
+ (left, right) => map(left).size > map(right).size
+ )
+
+ val retval = new ArrayBuffer[T](keyList.size * 2)
+ var index = 0
+ var found = true
+
+ while (found) {
+ found = false
+ for (key <- keyList) {
+ val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
+ assert(containerList != null)
+ // Get the index'th entry for this host - if present
+ if (index < containerList.size){
+ retval += containerList.apply(index)
+ found = true
+ }
+ }
+ index += 1
+ }
+
+ retval.toList
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/150615a3/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/150615a3/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/150615a3/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------