You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:11 UTC

[2/6] [SPARK-1132] Persisting Web UI through refactoring the SparkListener interface

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
new file mode 100644
index 0000000..346f2b7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.{Properties, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.Map
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST._
+
+import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.scheduler._
+import org.apache.spark.storage._
+import org.apache.spark._
+
+private[spark] object JsonProtocol {
+  private implicit val format = DefaultFormats
+
+  /** ------------------------------------------------- *
+   * JSON serialization methods for SparkListenerEvents |
+   * -------------------------------------------------- */
+
+  def sparkEventToJson(event: SparkListenerEvent): JValue = {
+    event match {
+      case stageSubmitted: SparkListenerStageSubmitted =>
+        stageSubmittedToJson(stageSubmitted)
+      case stageCompleted: SparkListenerStageCompleted =>
+        stageCompletedToJson(stageCompleted)
+      case taskStart: SparkListenerTaskStart =>
+        taskStartToJson(taskStart)
+      case taskGettingResult: SparkListenerTaskGettingResult =>
+        taskGettingResultToJson(taskGettingResult)
+      case taskEnd: SparkListenerTaskEnd =>
+        taskEndToJson(taskEnd)
+      case jobStart: SparkListenerJobStart =>
+        jobStartToJson(jobStart)
+      case jobEnd: SparkListenerJobEnd =>
+        jobEndToJson(jobEnd)
+      case environmentUpdate: SparkListenerEnvironmentUpdate =>
+        environmentUpdateToJson(environmentUpdate)
+      case blockManagerAdded: SparkListenerBlockManagerAdded =>
+        blockManagerAddedToJson(blockManagerAdded)
+      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
+        blockManagerRemovedToJson(blockManagerRemoved)
+      case unpersistRDD: SparkListenerUnpersistRDD =>
+        unpersistRDDToJson(unpersistRDD)
+
+      // Not used, but keeps compiler happy
+      case SparkListenerShutdown => JNothing
+    }
+  }
+
+  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
+    val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
+    val properties = propertiesToJson(stageSubmitted.properties)
+    ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+    ("Stage Info" -> stageInfo) ~
+    ("Properties" -> properties)
+  }
+
+  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
+    val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
+    ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+    ("Stage Info" -> stageInfo)
+  }
+
+  def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
+    val taskInfo = taskStart.taskInfo
+    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+    ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+    ("Stage ID" -> taskStart.stageId) ~
+    ("Task Info" -> taskInfoJson)
+  }
+
+  def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
+    val taskInfo = taskGettingResult.taskInfo
+    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+    ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+    ("Task Info" -> taskInfoJson)
+  }
+
+  def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
+    val taskEndReason = taskEndReasonToJson(taskEnd.reason)
+    val taskInfo = taskEnd.taskInfo
+    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+    val taskMetrics = taskEnd.taskMetrics
+    val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
+    ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+    ("Stage ID" -> taskEnd.stageId) ~
+    ("Task Type" -> taskEnd.taskType) ~
+    ("Task End Reason" -> taskEndReason) ~
+    ("Task Info" -> taskInfoJson) ~
+    ("Task Metrics" -> taskMetricsJson)
+  }
+
+  def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
+    val properties = propertiesToJson(jobStart.properties)
+    ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+    ("Job ID" -> jobStart.jobId) ~
+    ("Stage IDs" -> jobStart.stageIds) ~
+    ("Properties" -> properties)
+  }
+
+  def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
+    val jobResult = jobResultToJson(jobEnd.jobResult)
+    ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+    ("Job ID" -> jobEnd.jobId) ~
+    ("Job Result" -> jobResult)
+  }
+
+  def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = {
+    val environmentDetails = environmentUpdate.environmentDetails
+    val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
+    val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
+    val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
+    val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
+    ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+    ("JVM Information" -> jvmInformation) ~
+    ("Spark Properties" -> sparkProperties) ~
+    ("System Properties" -> systemProperties) ~
+    ("Classpath Entries" -> classpathEntries)
+  }
+
+  def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
+    val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
+    ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+    ("Block Manager ID" -> blockManagerId) ~
+    ("Maximum Memory" -> blockManagerAdded.maxMem)
+  }
+
+  def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
+    val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
+    ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+    ("Block Manager ID" -> blockManagerId)
+  }
+
+  def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
+    ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+    ("RDD ID" -> unpersistRDD.rddId)
+  }
+
+
+  /** ------------------------------------------------------------------- *
+   * JSON serialization methods for classes SparkListenerEvents depend on |
+   * -------------------------------------------------------------------- */
+
+  def stageInfoToJson(stageInfo: StageInfo): JValue = {
+    val rddInfo = rddInfoToJson(stageInfo.rddInfo)
+    val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
+    val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
+    ("Stage ID" -> stageInfo.stageId) ~
+    ("Stage Name" -> stageInfo.name) ~
+    ("Number of Tasks" -> stageInfo.numTasks) ~
+    ("RDD Info" -> rddInfo) ~
+    ("Submission Time" -> submissionTime) ~
+    ("Completion Time" -> completionTime) ~
+    ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
+  }
+
+  def taskInfoToJson(taskInfo: TaskInfo): JValue = {
+    ("Task ID" -> taskInfo.taskId) ~
+    ("Index" -> taskInfo.index) ~
+    ("Launch Time" -> taskInfo.launchTime) ~
+    ("Executor ID" -> taskInfo.executorId) ~
+    ("Host" -> taskInfo.host) ~
+    ("Locality" -> taskInfo.taskLocality.toString) ~
+    ("Getting Result Time" -> taskInfo.gettingResultTime) ~
+    ("Finish Time" -> taskInfo.finishTime) ~
+    ("Failed" -> taskInfo.failed) ~
+    ("Serialized Size" -> taskInfo.serializedSize)
+  }
+
+  def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
+    val shuffleReadMetrics =
+      taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
+    val shuffleWriteMetrics =
+      taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+    val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
+        JArray(blocks.toList.map { case (id, status) =>
+          ("Block ID" -> blockIdToJson(id)) ~
+          ("Status" -> blockStatusToJson(status))
+        })
+      }.getOrElse(JNothing)
+    ("Host Name" -> taskMetrics.hostname) ~
+    ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
+    ("Executor Run Time" -> taskMetrics.executorRunTime) ~
+    ("Result Size" -> taskMetrics.resultSize) ~
+    ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
+    ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
+    ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
+    ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
+    ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
+    ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+    ("Updated Blocks" -> updatedBlocks)
+  }
+
+  def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
+    ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
+    ("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
+    ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
+    ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
+    ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
+    ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead)
+  }
+
+  def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
+    ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
+    ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
+  }
+
+  def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
+    val reason = Utils.getFormattedClassName(taskEndReason)
+    val json = taskEndReason match {
+      case fetchFailed: FetchFailed =>
+        val blockManagerAddress = blockManagerIdToJson(fetchFailed.bmAddress)
+        ("Block Manager Address" -> blockManagerAddress) ~
+        ("Shuffle ID" -> fetchFailed.shuffleId) ~
+        ("Map ID" -> fetchFailed.mapId) ~
+        ("Reduce ID" -> fetchFailed.reduceId)
+      case exceptionFailure: ExceptionFailure =>
+        val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
+        val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
+        ("Class Name" -> exceptionFailure.className) ~
+        ("Description" -> exceptionFailure.description) ~
+        ("Stack Trace" -> stackTrace) ~
+        ("Metrics" -> metrics)
+      case _ => Utils.emptyJson
+    }
+    ("Reason" -> reason) ~ json
+  }
+
+  def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
+    ("Executor ID" -> blockManagerId.executorId) ~
+    ("Host" -> blockManagerId.host) ~
+    ("Port" -> blockManagerId.port) ~
+    ("Netty Port" -> blockManagerId.nettyPort)
+  }
+
+  def jobResultToJson(jobResult: JobResult): JValue = {
+    val result = Utils.getFormattedClassName(jobResult)
+    val json = jobResult match {
+      case JobSucceeded => Utils.emptyJson
+      case jobFailed: JobFailed =>
+        val exception = exceptionToJson(jobFailed.exception)
+        ("Exception" -> exception) ~
+        ("Failed Stage ID" -> jobFailed.failedStageId)
+    }
+    ("Result" -> result) ~ json
+  }
+
+  def rddInfoToJson(rddInfo: RDDInfo): JValue = {
+    val storageLevel = storageLevelToJson(rddInfo.storageLevel)
+    ("RDD ID" -> rddInfo.id) ~
+    ("Name" -> rddInfo.name) ~
+    ("Storage Level" -> storageLevel) ~
+    ("Number of Partitions" -> rddInfo.numPartitions) ~
+    ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
+    ("Memory Size" -> rddInfo.memSize) ~
+    ("Disk Size" -> rddInfo.diskSize)
+  }
+
+  def storageLevelToJson(storageLevel: StorageLevel): JValue = {
+    ("Use Disk" -> storageLevel.useDisk) ~
+    ("Use Memory" -> storageLevel.useMemory) ~
+    ("Deserialized" -> storageLevel.deserialized) ~
+    ("Replication" -> storageLevel.replication)
+  }
+
+  def blockIdToJson(blockId: BlockId): JValue = {
+    val blockType = Utils.getFormattedClassName(blockId)
+    val json: JObject = blockId match {
+      case rddBlockId: RDDBlockId =>
+        ("RDD ID" -> rddBlockId.rddId) ~
+        ("Split Index" -> rddBlockId.splitIndex)
+      case shuffleBlockId: ShuffleBlockId =>
+        ("Shuffle ID" -> shuffleBlockId.shuffleId) ~
+        ("Map ID" -> shuffleBlockId.mapId) ~
+        ("Reduce ID" -> shuffleBlockId.reduceId)
+      case broadcastBlockId: BroadcastBlockId =>
+        "Broadcast ID" -> broadcastBlockId.broadcastId
+      case broadcastHelperBlockId: BroadcastHelperBlockId =>
+        ("Broadcast Block ID" -> blockIdToJson(broadcastHelperBlockId.broadcastId)) ~
+        ("Helper Type" -> broadcastHelperBlockId.hType)
+      case taskResultBlockId: TaskResultBlockId =>
+        "Task ID" -> taskResultBlockId.taskId
+      case streamBlockId: StreamBlockId =>
+        ("Stream ID" -> streamBlockId.streamId) ~
+        ("Unique ID" -> streamBlockId.uniqueId)
+      case tempBlockId: TempBlockId =>
+        val uuid = UUIDToJson(tempBlockId.id)
+        "Temp ID" -> uuid
+      case testBlockId: TestBlockId =>
+        "Test ID" -> testBlockId.id
+    }
+    ("Type" -> blockType) ~ json
+  }
+
+  def blockStatusToJson(blockStatus: BlockStatus): JValue = {
+    val storageLevel = storageLevelToJson(blockStatus.storageLevel)
+    ("Storage Level" -> storageLevel) ~
+    ("Memory Size" -> blockStatus.memSize) ~
+    ("Disk Size" -> blockStatus.diskSize)
+  }
+
+
+  /** ------------------------------ *
+   * Util JSON serialization methods |
+   * ------------------------------- */
+
+  def mapToJson(m: Map[String, String]): JValue = {
+    val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
+    JObject(jsonFields.toList)
+  }
+
+  def propertiesToJson(properties: Properties): JValue = {
+    Option(properties).map { p =>
+      mapToJson(p.asScala)
+    }.getOrElse(JNothing)
+  }
+
+  def UUIDToJson(id: UUID): JValue = {
+    ("Least Significant Bits" -> id.getLeastSignificantBits) ~
+    ("Most Significant Bits" -> id.getMostSignificantBits)
+  }
+
+  def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
+    JArray(stackTrace.map { case line =>
+      ("Declaring Class" -> line.getClassName) ~
+      ("Method Name" -> line.getMethodName) ~
+      ("File Name" -> line.getFileName) ~
+      ("Line Number" -> line.getLineNumber)
+    }.toList)
+  }
+
+  def exceptionToJson(exception: Exception): JValue = {
+    ("Message" -> exception.getMessage) ~
+    ("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
+  }
+
+
+  /** --------------------------------------------------- *
+   * JSON deserialization methods for SparkListenerEvents |
+   * ---------------------------------------------------- */
+
+  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+    val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
+    val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
+    val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
+    val taskGettingResult = Utils.getFormattedClassName(SparkListenerTaskGettingResult)
+    val taskEnd = Utils.getFormattedClassName(SparkListenerTaskEnd)
+    val jobStart = Utils.getFormattedClassName(SparkListenerJobStart)
+    val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd)
+    val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
+    val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded)
+    val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved)
+    val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
+
+    (json \ "Event").extract[String] match {
+      case `stageSubmitted` => stageSubmittedFromJson(json)
+      case `stageCompleted` => stageCompletedFromJson(json)
+      case `taskStart` => taskStartFromJson(json)
+      case `taskGettingResult` => taskGettingResultFromJson(json)
+      case `taskEnd` => taskEndFromJson(json)
+      case `jobStart` => jobStartFromJson(json)
+      case `jobEnd` => jobEndFromJson(json)
+      case `environmentUpdate` => environmentUpdateFromJson(json)
+      case `blockManagerAdded` => blockManagerAddedFromJson(json)
+      case `blockManagerRemoved` => blockManagerRemovedFromJson(json)
+      case `unpersistRDD` => unpersistRDDFromJson(json)
+    }
+  }
+
+  def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
+    val stageInfo = stageInfoFromJson(json \ "Stage Info")
+    val properties = propertiesFromJson(json \ "Properties")
+    SparkListenerStageSubmitted(stageInfo, properties)
+  }
+
+  def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
+    val stageInfo = stageInfoFromJson(json \ "Stage Info")
+    SparkListenerStageCompleted(stageInfo)
+  }
+
+  def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
+    val stageId = (json \ "Stage ID").extract[Int]
+    val taskInfo = taskInfoFromJson(json \ "Task Info")
+    SparkListenerTaskStart(stageId, taskInfo)
+  }
+
+  def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
+    val taskInfo = taskInfoFromJson(json \ "Task Info")
+    SparkListenerTaskGettingResult(taskInfo)
+  }
+
+  def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
+    val stageId = (json \ "Stage ID").extract[Int]
+    val taskType = (json \ "Task Type").extract[String]
+    val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
+    val taskInfo = taskInfoFromJson(json \ "Task Info")
+    val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+    SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics)
+  }
+
+  def jobStartFromJson(json: JValue): SparkListenerJobStart = {
+    val jobId = (json \ "Job ID").extract[Int]
+    val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
+    val properties = propertiesFromJson(json \ "Properties")
+    SparkListenerJobStart(jobId, stageIds, properties)
+  }
+
+  def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
+    val jobId = (json \ "Job ID").extract[Int]
+    val jobResult = jobResultFromJson(json \ "Job Result")
+    SparkListenerJobEnd(jobId, jobResult)
+  }
+
+  def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
+    val environmentDetails = Map[String, Seq[(String, String)]](
+      "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
+      "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+      "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+      "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+    SparkListenerEnvironmentUpdate(environmentDetails)
+  }
+
+  def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
+    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+    val maxMem = (json \ "Maximum Memory").extract[Long]
+    SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+  }
+
+  def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
+    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+    SparkListenerBlockManagerRemoved(blockManagerId)
+  }
+
+  def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
+    SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+  }
+
+
+  /** --------------------------------------------------------------------- *
+   * JSON deserialization methods for classes SparkListenerEvents depend on |
+   * ---------------------------------------------------------------------- */
+
+  def stageInfoFromJson(json: JValue): StageInfo = {
+    val stageId = (json \ "Stage ID").extract[Int]
+    val stageName = (json \ "Stage Name").extract[String]
+    val numTasks = (json \ "Number of Tasks").extract[Int]
+    val rddInfo = rddInfoFromJson(json \ "RDD Info")
+    val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
+    val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
+    val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
+
+    val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
+    stageInfo.submissionTime = submissionTime
+    stageInfo.completionTime = completionTime
+    stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
+    stageInfo
+  }
+
+  def taskInfoFromJson(json: JValue): TaskInfo = {
+    val taskId = (json \ "Task ID").extract[Long]
+    val index = (json \ "Index").extract[Int]
+    val launchTime = (json \ "Launch Time").extract[Long]
+    val executorId = (json \ "Executor ID").extract[String]
+    val host = (json \ "Host").extract[String]
+    val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
+    val gettingResultTime = (json \ "Getting Result Time").extract[Long]
+    val finishTime = (json \ "Finish Time").extract[Long]
+    val failed = (json \ "Failed").extract[Boolean]
+    val serializedSize = (json \ "Serialized Size").extract[Int]
+
+    val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
+    taskInfo.gettingResultTime = gettingResultTime
+    taskInfo.finishTime = finishTime
+    taskInfo.failed = failed
+    taskInfo.serializedSize = serializedSize
+    taskInfo
+  }
+
+  def taskMetricsFromJson(json: JValue): TaskMetrics = {
+    val metrics = new TaskMetrics
+    metrics.hostname = (json \ "Host Name").extract[String]
+    metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
+    metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
+    metrics.resultSize = (json \ "Result Size").extract[Long]
+    metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
+    metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
+    metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
+    metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
+    metrics.shuffleReadMetrics =
+      Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
+    metrics.shuffleWriteMetrics =
+      Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+    metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
+      value.extract[List[JValue]].map { block =>
+        val id = blockIdFromJson(block \ "Block ID")
+        val status = blockStatusFromJson(block \ "Status")
+        (id, status)
+      }
+    }
+    metrics
+  }
+
+  def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
+    val metrics = new ShuffleReadMetrics
+    metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
+    metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
+    metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
+    metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
+    metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
+    metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
+    metrics
+  }
+
+  def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
+    val metrics = new ShuffleWriteMetrics
+    metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
+    metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
+    metrics
+  }
+
+  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+    val success = Utils.getFormattedClassName(Success)
+    val resubmitted = Utils.getFormattedClassName(Resubmitted)
+    val fetchFailed = Utils.getFormattedClassName(FetchFailed)
+    val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
+    val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
+    val taskKilled = Utils.getFormattedClassName(TaskKilled)
+    val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
+    val unknownReason = Utils.getFormattedClassName(UnknownReason)
+
+    (json \ "Reason").extract[String] match {
+      case `success` => Success
+      case `resubmitted` => Resubmitted
+      case `fetchFailed` =>
+        val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
+        val shuffleId = (json \ "Shuffle ID").extract[Int]
+        val mapId = (json \ "Map ID").extract[Int]
+        val reduceId = (json \ "Reduce ID").extract[Int]
+        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
+      case `exceptionFailure` =>
+        val className = (json \ "Class Name").extract[String]
+        val description = (json \ "Description").extract[String]
+        val stackTrace = stackTraceFromJson(json \ "Stack Trace")
+        val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
+        new ExceptionFailure(className, description, stackTrace, metrics)
+      case `taskResultLost` => TaskResultLost
+      case `taskKilled` => TaskKilled
+      case `executorLostFailure` => ExecutorLostFailure
+      case `unknownReason` => UnknownReason
+    }
+  }
+
+  def blockManagerIdFromJson(json: JValue): BlockManagerId = {
+    val executorId = (json \ "Executor ID").extract[String]
+    val host = (json \ "Host").extract[String]
+    val port = (json \ "Port").extract[Int]
+    val nettyPort = (json \ "Netty Port").extract[Int]
+    BlockManagerId(executorId, host, port, nettyPort)
+  }
+
+  def jobResultFromJson(json: JValue): JobResult = {
+    val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
+    val jobFailed = Utils.getFormattedClassName(JobFailed)
+
+    (json \ "Result").extract[String] match {
+      case `jobSucceeded` => JobSucceeded
+      case `jobFailed` =>
+        val exception = exceptionFromJson(json \ "Exception")
+        val failedStageId = (json \ "Failed Stage ID").extract[Int]
+        new JobFailed(exception, failedStageId)
+    }
+  }
+
+  def rddInfoFromJson(json: JValue): RDDInfo = {
+    val rddId = (json \ "RDD ID").extract[Int]
+    val name = (json \ "Name").extract[String]
+    val storageLevel = storageLevelFromJson(json \ "Storage Level")
+    val numPartitions = (json \ "Number of Partitions").extract[Int]
+    val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
+    val memSize = (json \ "Memory Size").extract[Long]
+    val diskSize = (json \ "Disk Size").extract[Long]
+
+    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
+    rddInfo.numCachedPartitions = numCachedPartitions
+    rddInfo.memSize = memSize
+    rddInfo.diskSize = diskSize
+    rddInfo
+  }
+
+  def storageLevelFromJson(json: JValue): StorageLevel = {
+    val useDisk = (json \ "Use Disk").extract[Boolean]
+    val useMemory = (json \ "Use Memory").extract[Boolean]
+    val deserialized = (json \ "Deserialized").extract[Boolean]
+    val replication = (json \ "Replication").extract[Int]
+    StorageLevel(useDisk, useMemory, deserialized, replication)
+  }
+
+  def blockIdFromJson(json: JValue): BlockId = {
+    val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
+    val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
+    val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
+    val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId)
+    val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
+    val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
+    val tempBlockId = Utils.getFormattedClassName(TempBlockId)
+    val testBlockId = Utils.getFormattedClassName(TestBlockId)
+
+    (json \ "Type").extract[String] match {
+      case `rddBlockId` =>
+        val rddId = (json \ "RDD ID").extract[Int]
+        val splitIndex = (json \ "Split Index").extract[Int]
+        new RDDBlockId(rddId, splitIndex)
+      case `shuffleBlockId` =>
+        val shuffleId = (json \ "Shuffle ID").extract[Int]
+        val mapId = (json \ "Map ID").extract[Int]
+        val reduceId = (json \ "Reduce ID").extract[Int]
+        new ShuffleBlockId(shuffleId, mapId, reduceId)
+      case `broadcastBlockId` =>
+        val broadcastId = (json \ "Broadcast ID").extract[Long]
+        new BroadcastBlockId(broadcastId)
+      case `broadcastHelperBlockId` =>
+        val broadcastBlockId =
+          blockIdFromJson(json \ "Broadcast Block ID").asInstanceOf[BroadcastBlockId]
+        val hType = (json \ "Helper Type").extract[String]
+        new BroadcastHelperBlockId(broadcastBlockId, hType)
+      case `taskResultBlockId` =>
+        val taskId = (json \ "Task ID").extract[Long]
+        new TaskResultBlockId(taskId)
+      case `streamBlockId` =>
+        val streamId = (json \ "Stream ID").extract[Int]
+        val uniqueId = (json \ "Unique ID").extract[Long]
+        new StreamBlockId(streamId, uniqueId)
+      case `tempBlockId` =>
+        val tempId = UUIDFromJson(json \ "Temp ID")
+        new TempBlockId(tempId)
+      case `testBlockId` =>
+        val testId = (json \ "Test ID").extract[String]
+        new TestBlockId(testId)
+    }
+  }
+
+  def blockStatusFromJson(json: JValue): BlockStatus = {
+    val storageLevel = storageLevelFromJson(json \ "Storage Level")
+    val memorySize = (json \ "Memory Size").extract[Long]
+    val diskSize = (json \ "Disk Size").extract[Long]
+    BlockStatus(storageLevel, memorySize, diskSize)
+  }
+
+
+  /** -------------------------------- *
+   * Util JSON deserialization methods |
+   * --------------------------------- */
+
+  def mapFromJson(json: JValue): Map[String, String] = {
+    val jsonFields = json.asInstanceOf[JObject].obj
+    jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
+  }
+
+  def propertiesFromJson(json: JValue): Properties = {
+    val properties = new Properties()
+    if (json != JNothing) {
+      mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
+    }
+    properties
+  }
+
+  def UUIDFromJson(json: JValue): UUID = {
+    val leastSignificantBits = (json \ "Least Significant Bits").extract[Long]
+    val mostSignificantBits = (json \ "Most Significant Bits").extract[Long]
+    new UUID(leastSignificantBits, mostSignificantBits)
+  }
+
+  def stackTraceFromJson(json: JValue): Array[StackTraceElement] = {
+    json.extract[List[JValue]].map { line =>
+      val declaringClass = (line \ "Declaring Class").extract[String]
+      val methodName = (line \ "Method Name").extract[String]
+      val fileName = (line \ "File Name").extract[String]
+      val lineNumber = (line \ "Line Number").extract[Int]
+      new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
+    }.toArray
+  }
+
+  def exceptionFromJson(json: JValue): Exception = {
+    val e = new Exception((json \ "Message").extract[String])
+    e.setStackTrace(stackTraceFromJson(json \ "Stack Trace"))
+    e
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 38a275d..13d9dbd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -32,11 +32,11 @@ import scala.reflect.ClassTag
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.json4s._
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import org.apache.spark.deploy.SparkHadoopUtil
-
+import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
 /**
  * Various utility methods used by Spark.
@@ -245,7 +245,7 @@ private[spark] object Utils extends Logging {
     val userCred = securityMgr.getSecretKey()
     if (userCred == null) throw new Exception("Secret key is null with authentication on")
     val userInfo = securityMgr.getHttpUser()  + ":" + userCred
-    new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(), 
+    new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(),
       uri.getQuery(), uri.getFragment())
   }
 
@@ -282,7 +282,7 @@ private[spark] object Utils extends Logging {
         uc.setConnectTimeout(timeout)
         uc.setReadTimeout(timeout)
         uc.connect()
-        val in = uc.getInputStream();
+        val in = uc.getInputStream()
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
@@ -328,8 +328,7 @@ private[spark] object Utils extends Logging {
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
-        val conf = SparkHadoopUtil.get.newConfiguration()
-        val fs = FileSystem.get(uri, conf)
+        val fs = getHadoopFileSystem(uri)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
@@ -500,7 +499,7 @@ private[spark] object Utils extends Logging {
    * millisecond.
    */
   def getUsedTimeMs(startTimeMs: Long): String = {
-    return " " + (System.currentTimeMillis - startTimeMs) + " ms"
+    " " + (System.currentTimeMillis - startTimeMs) + " ms"
   }
 
   /**
@@ -789,7 +788,7 @@ private[spark] object Utils extends Logging {
     }
     var i = 0
     while (i < s.length) {
-      var nextChar = s.charAt(i)
+      val nextChar = s.charAt(i)
       if (inDoubleQuote) {
         if (nextChar == '"') {
           inDoubleQuote = false
@@ -895,4 +894,27 @@ private[spark] object Utils extends Logging {
     }
     count
   }
+
+  /** Return the class name of the given object, removing all dollar signs */
+  def getFormattedClassName(obj: AnyRef) = {
+    obj.getClass.getSimpleName.replace("$", "")
+  }
+
+  /** Return an option that translates JNothing to None */
+  def jsonOption(json: JValue): Option[JValue] = {
+    json match {
+      case JNothing => None
+      case value: JValue => Some(value)
+    }
+  }
+
+  /** Return an empty JSON object */
+  def emptyJson = JObject(List[JField]())
+
+  /**
+   * Return a Hadoop FileSystem with the scheme encoded in the given path.
+   */
+  def getHadoopFileSystem(path: URI): FileSystem = {
+    FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 40e853c..c6b65c7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -43,6 +43,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
+import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.partial.BoundedDouble;
 import org.apache.spark.partial.PartialResult;
 import org.apache.spark.storage.StorageLevel;
@@ -402,16 +403,16 @@ public class JavaAPISuite implements Serializable {
 
   @Test
   public void javaDoubleRDDHistoGram() {
-   JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
-   // Test using generated buckets
-   Tuple2<double[], long[]> results = rdd.histogram(2);
-   double[] expected_buckets = {1.0, 2.5, 4.0};
-   long[] expected_counts = {2, 2};
-   Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
-   Assert.assertArrayEquals(expected_counts, results._2);
-   // Test with provided buckets
-   long[] histogram = rdd.histogram(expected_buckets);
-   Assert.assertArrayEquals(expected_counts, histogram);
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    // Test using generated buckets
+    Tuple2<double[], long[]> results = rdd.histogram(2);
+    double[] expected_buckets = {1.0, 2.5, 4.0};
+    long[] expected_counts = {2, 2};
+    Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
+    Assert.assertArrayEquals(expected_counts, results._2);
+    // Test with provided buckets
+    long[] histogram = rdd.histogram(expected_buckets);
+    Assert.assertArrayEquals(expected_counts, histogram);
   }
 
   @Test
@@ -570,7 +571,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void iterator() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
-    TaskContext context = new TaskContext(0, 0, 0, false, false, null);
+    TaskContext context = new TaskContext(0, 0, 0, false, false, new TaskMetrics());
     Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index ea936e8..b86923f 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -23,7 +23,8 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage._
 
 // TODO: Test the CacheManager's thread-safety aspects
 class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
@@ -54,12 +55,12 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
     expecting {
       blockManager.get(RDDBlockId(0, 0)).andReturn(None)
       blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
-        true).andReturn(0)
+        true).andStubReturn(Seq[(BlockId, BlockStatus)]())
     }
 
     whenExecuting(blockManager) {
       val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
-        taskMetrics = null)
+        taskMetrics = TaskMetrics.empty())
       val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(1, 2, 3, 4))
     }
@@ -72,7 +73,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
 
     whenExecuting(blockManager) {
       val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
-        taskMetrics = null)
+        taskMetrics = TaskMetrics.empty())
       val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(5, 6, 7))
     }
@@ -86,7 +87,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
 
     whenExecuting(blockManager) {
       val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false,
-        taskMetrics = null)
+        taskMetrics = TaskMetrics.empty())
       val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(1, 2, 3, 4))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 20c503d..7a39d1a 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.matchers.ShouldMatchers
 
 import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 
 /**
  * Test suite for cancelling running jobs. We run the cancellation tasks for single job action
@@ -89,7 +89,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
 
     // Add a listener to release the semaphore once any tasks are launched.
     val sem = new Semaphore(0)
-    sc.dagScheduler.addSparkListener(new SparkListener {
+    sc.addSparkListener(new SparkListener {
       override def onTaskStart(taskStart: SparkListenerTaskStart) {
         sem.release()
       }
@@ -161,7 +161,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
     {
       // Add a listener to release the semaphore once any tasks are launched.
       val sem = new Semaphore(0)
-      sc.dagScheduler.addSparkListener(new SparkListener {
+      sc.addSparkListener(new SparkListener {
         override def onTaskStart(taskStart: SparkListenerTaskStart) {
           sem.release()
         }
@@ -191,7 +191,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
     {
       // Add a listener to release the semaphore once any tasks are launched.
       val sem = new Semaphore(0)
-      sc.dagScheduler.addSparkListener(new SparkListener {
+      sc.addSparkListener(new SparkListener {
         override def onTaskStart(taskStart: SparkListenerTaskStart) {
           sem.release()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 0bac78d..6e7fd55 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -27,8 +27,11 @@ import org.apache.hadoop.fs.Path
 import scala.collection.Map
 import scala.sys.process._
 import scala.util.Try
+
 import org.apache.hadoop.io.{Text, LongWritable}
 
+import org.apache.spark.executor.TaskMetrics
+
 class PipedRDDSuite extends FunSuite with SharedSparkContext {
 
   test("basic pipe") {
@@ -151,7 +154,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
       val hadoopPart1 = generateFakeHadoopPartition()
       val pipedRdd = new PipedRDD(nums, "printenv " + varName)
       val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
-        taskMetrics = null)
+        taskMetrics = TaskMetrics.empty())
       val rddIter = pipedRdd.compute(hadoopPart1, tContext)
       val arr = rddIter.toArray
       assert(arr(0) == "/some/path")

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 3bb9367..b543471 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import org.scalatest.{FunSuite, PrivateMethodTester}
 
-import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
+import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
@@ -32,7 +32,7 @@ class SparkContextSchedulerCreationSuite
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
     sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
-    val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+    val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
     sched.asInstanceOf[TaskSchedulerImpl]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bae3b37..9f2924c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -20,12 +20,9 @@ package org.apache.spark.deploy
 import java.io.File
 import java.util.Date
 
+import com.fasterxml.jackson.core.JsonParseException
 import org.json4s._
-
-import org.json4s.JValue
 import org.json4s.jackson.JsonMethods
-import com.fasterxml.jackson.core.JsonParseException
-
 import org.scalatest.FunSuite
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
@@ -96,7 +93,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createAppInfo() : ApplicationInfo = {
     val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
-      "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue)
+      "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
     appInfo.endTime = JsonConstants.currTimeInMillis
     appInfo
   }
@@ -148,12 +145,12 @@ object JsonConstants {
   val submitDate = new Date(123456789)
   val appInfoJsonStr =
     """
-      |{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr",
+      |{"starttime":3,"id":"id","name":"name",
       |"cores":4,"user":"%s",
       |"memoryperslave":1234,"submitdate":"%s",
       |"state":"WAITING","duration":%d}
     """.format(System.getProperty("user.name", "<unknown>"),
-        submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin
+        submitDate.toString, currTimeInMillis - appInfoStartTime).stripMargin
 
   val workerInfoJsonStr =
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ad890b4..c97543f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -94,7 +94,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     cacheLocations.clear()
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster(conf)
-    scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
+    scheduler = new DAGScheduler(
+        taskScheduler,
+        sc.listenerBus,
+        mapOutputTracker,
+        blockManagerMaster,
+        sc.env) {
       override def runLocally(job: ActiveJob) {
         // don't bother with the thread while unit testing
         runLocallyWithinThread(job)
@@ -422,15 +427,15 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
   private def assertDataStructuresEmpty = {
     assert(scheduler.pendingTasks.isEmpty)
     assert(scheduler.activeJobs.isEmpty)
-    assert(scheduler.failed.isEmpty)
-    assert(scheduler.idToActiveJob.isEmpty)
+    assert(scheduler.failedStages.isEmpty)
+    assert(scheduler.stageIdToActiveJob.isEmpty)
     assert(scheduler.jobIdToStageIds.isEmpty)
     assert(scheduler.stageIdToJobIds.isEmpty)
     assert(scheduler.stageIdToStage.isEmpty)
     assert(scheduler.stageToInfos.isEmpty)
     assert(scheduler.resultStageToJob.isEmpty)
-    assert(scheduler.running.isEmpty)
+    assert(scheduler.runningStages.isEmpty)
     assert(scheduler.shuffleToMapStage.isEmpty)
-    assert(scheduler.waiting.isEmpty)
+    assert(scheduler.waitingStages.isEmpty)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
deleted file mode 100644
index 25fe63c..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
-  val WAIT_TIMEOUT_MILLIS = 10000
-
-  test("inner method") {
-    sc = new SparkContext("local", "joblogger")
-    val joblogger = new JobLogger {
-      def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
-      def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
-      def getRddNameTest(rdd: RDD[_]) = getRddName(rdd)
-      def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
-    }
-    type MyRDD = RDD[(Int, Int)]
-    def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
-      val maxPartition = numPartitions - 1
-      new MyRDD(sc, dependencies) {
-        override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
-          throw new RuntimeException("should not be reached")
-        override def getPartitions = (0 to maxPartition).map(i => new Partition {
-          override def index = i
-        }).toArray
-      }
-    }
-    val jobID = 5
-    val parentRdd = makeRdd(4, Nil)
-    val shuffleDep = new ShuffleDependency(parentRdd, null)
-    val rootRdd = makeRdd(4, List(shuffleDep))
-    val shuffleMapStage =
-      new Stage(1, parentRdd, parentRdd.partitions.size, Some(shuffleDep), Nil, jobID, None)
-    val rootStage =
-      new Stage(0, rootRdd, rootRdd.partitions.size, None, List(shuffleMapStage), jobID, None)
-    val rootStageInfo = new StageInfo(rootStage)
-
-    joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
-    joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
-    parentRdd.setName("MyRDD")
-    joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
-    joblogger.createLogWriterTest(jobID)
-    joblogger.getJobIDtoPrintWriter.size should be (1)
-    joblogger.buildJobDepTest(jobID, rootStage)
-    joblogger.getJobIDToStages.get(jobID).get.size should be (2)
-    joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
-    joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
-    joblogger.closeLogWriterTest(jobID)
-    joblogger.getStageIDToJobID.size should be (0)
-    joblogger.getJobIDToStages.size should be (0)
-    joblogger.getJobIDtoPrintWriter.size should be (0)
-  }
-  
-  test("inner variables") {
-    sc = new SparkContext("local[4]", "joblogger")
-    val joblogger = new JobLogger {
-      override protected def closeLogWriter(jobID: Int) = 
-        getJobIDtoPrintWriter.get(jobID).foreach { fileWriter => 
-          fileWriter.close()
-        }
-    }
-    sc.addSparkListener(joblogger)
-    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
-    rdd.reduceByKey(_+_).collect()
-
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
-
-    val user = System.getProperty("user.name",  SparkContext.SPARK_UNKNOWN_USER)
-    
-    joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
-    joblogger.getJobIDtoPrintWriter.size should be (1)
-    joblogger.getStageIDToJobID.size should be (2)
-    joblogger.getStageIDToJobID.get(0) should be (Some(0))
-    joblogger.getStageIDToJobID.get(1) should be (Some(0))
-    joblogger.getJobIDToStages.size should be (1)
-  }
-  
-  
-  test("interface functions") {
-    sc = new SparkContext("local[4]", "joblogger")
-    val joblogger = new JobLogger {
-      var onTaskEndCount = 0
-      var onJobEndCount = 0 
-      var onJobStartCount = 0
-      var onStageCompletedCount = 0
-      var onStageSubmittedCount = 0
-      override def onTaskEnd(taskEnd: SparkListenerTaskEnd)  = onTaskEndCount += 1
-      override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
-      override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
-      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
-      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
-    }
-    sc.addSparkListener(joblogger)
-    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
-    rdd.reduceByKey(_+_).collect()
-
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
-
-    joblogger.onJobStartCount should be (1)
-    joblogger.onJobEndCount should be (1)
-    joblogger.onTaskEndCount should be (8)
-    joblogger.onStageSubmittedCount should be (2)
-    joblogger.onStageCompletedCount should be (2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 7c4f2b4..a25ce35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable.{Buffer, HashSet}
+import scala.collection.mutable
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 import org.scalatest.matchers.ShouldMatchers
 
 import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.SparkContext._
+import org.apache.spark.executor.TaskMetrics
 
 class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
     with BeforeAndAfter with BeforeAndAfterAll {
@@ -38,43 +39,76 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     System.clearProperty("spark.akka.frameSize")
   }
 
+  test("basic creation and shutdown of LiveListenerBus") {
+    val counter = new BasicJobCounter
+    val bus = new LiveListenerBus
+    bus.addListener(counter)
+
+    // Listener bus hasn't started yet, so posting events should not increment counter
+    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+    assert(counter.count === 0)
+
+    // Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
+    bus.start()
+    Thread.sleep(1000)
+    assert(counter.count === 5)
+
+    // After listener bus has stopped, posting events should not increment counter
+    bus.stop()
+    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+    assert(counter.count === 5)
+
+    // Listener bus must not be started twice
+    intercept[IllegalStateException] {
+      val bus = new LiveListenerBus
+      bus.start()
+      bus.start()
+    }
+
+    // ... or stopped before starting
+    intercept[IllegalStateException] {
+      val bus = new LiveListenerBus
+      bus.stop()
+    }
+  }
+
   test("basic creation of StageInfo") {
-    val listener = new SaveStageInfo
+    val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
-    val rdd2 = rdd1.map(x => x.toString)
+    val rdd2 = rdd1.map(_.toString)
     rdd2.setName("Target RDD")
     rdd2.count
 
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
 
     listener.stageInfos.size should be {1}
-    val first = listener.stageInfos.head
-    first.rddName should be {"Target RDD"}
-    first.numTasks should be {4}
-    first.numPartitions should be {4}
-    first.submissionTime should be ('defined)
-    first.completionTime should be ('defined)
-    first.taskInfos.length should be {4}
+    val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
+    stageInfo.rddInfo.name should be {"Target RDD"}
+    stageInfo.numTasks should be {4}
+    stageInfo.rddInfo.numPartitions should be {4}
+    stageInfo.submissionTime should be ('defined)
+    stageInfo.completionTime should be ('defined)
+    taskInfoMetrics.length should be {4}
   }
 
   test("StageInfo with fewer tasks than partitions") {
-    val listener = new SaveStageInfo
+    val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
-    val rdd2 = rdd1.map(x => x.toString)
+    val rdd2 = rdd1.map(_.toString)
     sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
 
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
 
     listener.stageInfos.size should be {1}
-    val first = listener.stageInfos.head
-    first.numTasks should be {2}
-    first.numPartitions should be {4}
+    val (stageInfo, _) = listener.stageInfos.head
+    stageInfo.numTasks should be {2}
+    stageInfo.rddInfo.numPartitions should be {4}
   }
 
   test("local metrics") {
-    val listener = new SaveStageInfo
+    val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     sc.addSparkListener(new StatsReportListener)
     //just to make sure some of the tasks take a noticeable amount of time
@@ -84,45 +118,45 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
       i
     }
 
-    val d = sc.parallelize(0 to 1e4.toInt, 64).map{i => w(i)}
+    val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
     d.count()
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (1)
 
-    val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
-
-    val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2")
-
-    val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)}
+    val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
+    val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
+    val d4 = d2.cogroup(d3, 64).map { case (k, (v1, v2)) =>
+      w(k) -> (v1.size, v2.size)
+    }
     d4.setName("A Cogroup")
-
     d4.collectAsMap()
 
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (4)
-    listener.stageInfos.foreach { stageInfo =>
-      /* small test, so some tasks might take less than 1 millisecond, but average should be greater
-       * than 0 ms. */
-      checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
+    listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
+      /**
+       * Small test, so some tasks might take less than 1 millisecond, but average should be greater
+       * than 0 ms.
+       */
       checkNonZeroAvg(
-        stageInfo.taskInfos.map{_._2.executorRunTime.toLong},
+        taskInfoMetrics.map(_._2.executorRunTime),
         stageInfo + " executorRunTime")
       checkNonZeroAvg(
-        stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong},
+        taskInfoMetrics.map(_._2.executorDeserializeTime),
         stageInfo + " executorDeserializeTime")
-      if (stageInfo.rddName == d4.name) {
+      if (stageInfo.rddInfo.name == d4.name) {
         checkNonZeroAvg(
-          stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
+          taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
           stageInfo + " fetchWaitTime")
       }
 
-      stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) =>
+      taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0l)
-        if (stageInfo.rddName == d2.name || stageInfo.rddName == d3.name) {
+        if (stageInfo.rddInfo.name == d2.name || stageInfo.rddInfo.name == d3.name) {
           taskMetrics.shuffleWriteMetrics should be ('defined)
           taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
         }
-        if (stageInfo.rddName == d4.name) {
+        if (stageInfo.rddInfo.name == d4.name) {
           taskMetrics.shuffleReadMetrics should be ('defined)
           val sm = taskMetrics.shuffleReadMetrics.get
           sm.totalBlocksFetched should be > (0)
@@ -142,10 +176,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     System.setProperty("spark.akka.frameSize", "1")
     val akkaFrameSize =
       sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
+    val result = sc.parallelize(Seq(1), 1)
+      .map { x => 1.to(akkaFrameSize).toArray }
+      .reduce { case (x, y) => x }
     assert(result === 1.to(akkaFrameSize).toArray)
 
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     val TASK_INDEX = 0
     assert(listener.startedTasks.contains(TASK_INDEX))
     assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
@@ -157,13 +193,13 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     sc.addSparkListener(listener)
  
     // Make a task whose result is larger than the akka frame size
-    val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+    val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
     assert(result === 2)
 
-    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     val TASK_INDEX = 0
     assert(listener.startedTasks.contains(TASK_INDEX))
-    assert(listener.startedGettingResultTasks.isEmpty == true)
+    assert(listener.startedGettingResultTasks.isEmpty)
     assert(listener.endedTasks.contains(TASK_INDEX))
   }
 
@@ -204,17 +240,33 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     assert(m.sum / m.size.toDouble > 0.0, msg)
   }
 
-  class SaveStageInfo extends SparkListener {
-    val stageInfos = Buffer[StageInfo]()
+  class BasicJobCounter extends SparkListener {
+    var count = 0
+    override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+  }
+
+  class SaveStageAndTaskInfo extends SparkListener {
+    val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
+    var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+    override def onTaskEnd(task: SparkListenerTaskEnd) {
+      val info = task.taskInfo
+      val metrics = task.taskMetrics
+      if (info != null && metrics != null) {
+        taskInfoMetrics += ((info, metrics))
+      }
+    }
+
     override def onStageCompleted(stage: SparkListenerStageCompleted) {
-      stageInfos += stage.stage
+      stageInfos(stage.stageInfo) = taskInfoMetrics
+      taskInfoMetrics = mutable.Buffer.empty
     }
   }
 
   class SaveTaskEvents extends SparkListener {
-    val startedTasks = new HashSet[Int]()
-    val startedGettingResultTasks = new HashSet[Int]()
-    val endedTasks = new HashSet[Int]()
+    val startedTasks = new mutable.HashSet[Int]()
+    val startedGettingResultTasks = new mutable.HashSet[Int]()
+    val endedTasks = new mutable.HashSet[Int]()
 
     override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
       startedTasks += taskStart.taskInfo.index

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 6b0800a..9274e01 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.scalatest.FunSuite
 
 import org.apache.spark._
@@ -270,9 +268,9 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
     val taskScheduler = new TaskSchedulerImpl(sc) 
     taskScheduler.initialize(new FakeSchedulerBackend)
     // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
-    var dagScheduler = new DAGScheduler(taskScheduler) {
+    val dagScheduler = new DAGScheduler(sc, taskScheduler) {
       override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorGained(execId: String, host: String) {}
+      override def executorAdded(execId: String, host: String) {}
     }
 
     val numFreeCores = 1
@@ -291,7 +289,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
       assert(1 === taskDescriptions.length)
       taskDescriptions(0).executorId
     }
-    var count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+    val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
     assert(count > 0)
     assert(count < numTrials)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 73153d2..9af5d3a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -26,7 +26,9 @@ import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.FakeClock
 
-class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
+class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
+  extends DAGScheduler(sc) {
+
   override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     taskScheduler.startedTasks += taskInfo.index
   }
@@ -41,7 +43,7 @@ class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(ta
     taskScheduler.endedTasks(taskInfo.index) = reason
   }
 
-  override def executorGained(execId: String, host: String) {}
+  override def executorAdded(execId: String, host: String) {}
 
   override def executorLost(execId: String) {}
 
@@ -66,7 +68,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
 
   val executors = new mutable.HashMap[String, String] ++ liveExecutors
 
-  dagScheduler = new FakeDAGScheduler(this)
+  dagScheduler = new FakeDAGScheduler(sc, this)
 
   def removeExecutor(execId: String): Unit = executors -= execId
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 1036b9f..e83cd55 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -28,7 +28,8 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.matchers.ShouldMatchers._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
 
@@ -57,7 +58,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     conf.set("spark.driver.port", boundPort.toString)
 
     master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
@@ -492,12 +494,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
-    assert(store.getSingle("a1").isDefined, "a2 was not in store")
-    assert(store.getSingle("a2").isDefined, "a3 was not in store")
-    assert(store.getSingle("a3").isDefined, "a1 was not in store")
-    assert(store.getSingle("a1").isDefined, "a2 was not in store")
-    assert(store.getSingle("a2").isDefined, "a3 was not in store")
-    assert(store.getSingle("a3").isDefined, "a1 was not in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    assert(store.getSingle("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
     store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
     assert(store.getSingle("a1") == None, "a1 was in store")
@@ -663,6 +662,60 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     }
   }
 
+  test("updated block statuses") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
+    val list = List.fill(2)(new Array[Byte](200))
+    val bigList = List.fill(8)(new Array[Byte](200))
+
+    // 1 updated block (i.e. list1)
+    val updatedBlocks1 =
+      store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(updatedBlocks1.size === 1)
+    assert(updatedBlocks1.head._1 === TestBlockId("list1"))
+    assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
+
+    // 1 updated block (i.e. list2)
+    val updatedBlocks2 =
+      store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    assert(updatedBlocks2.size === 1)
+    assert(updatedBlocks2.head._1 === TestBlockId("list2"))
+    assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
+
+    // 2 updated blocks - list1 is kicked out of memory while list3 is added
+    val updatedBlocks3 =
+      store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(updatedBlocks3.size === 2)
+    updatedBlocks3.foreach { case (id, status) =>
+      id match {
+        case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE)
+        case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
+        case _ => fail("Updated block is neither list1 nor list3")
+      }
+    }
+    assert(store.get("list3").isDefined, "list3 was not in store")
+
+    // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
+    val updatedBlocks4 =
+      store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(updatedBlocks4.size === 2)
+    updatedBlocks4.foreach { case (id, status) =>
+      id match {
+        case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY)
+        case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
+        case _ => fail("Updated block is neither list2 nor list4")
+      }
+    }
+    assert(store.get("list4").isDefined, "list4 was not in store")
+
+    // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+    val updatedBlocks5 =
+      store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(updatedBlocks5.size === 0)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(!store.get("list5").isDefined, "list5 was in store")
+  }
+
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
     store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 3041581..45c3224 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -22,6 +22,7 @@ import java.net.ServerSocket
 import scala.util.{Failure, Success, Try}
 
 import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.ServletContextHandler
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkConf
@@ -36,22 +37,27 @@ class UISuite extends FunSuite {
       case Failure(e) => 
       // Either case server port is busy hence setup for test complete
     }
-    val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
-      new SparkConf)
-    val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
-      new SparkConf)
+    val serverInfo1 = JettyUtils.startJettyServer(
+      "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
+    val serverInfo2 = JettyUtils.startJettyServer(
+      "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
     // Allow some wiggle room in case ports on the machine are under contention
+    val boundPort1 = serverInfo1.boundPort
+    val boundPort2 = serverInfo2.boundPort
     assert(boundPort1 > startPort && boundPort1 < startPort + 10)
     assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
   }
 
   test("jetty binds to port 0 correctly") {
-    val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq(), new SparkConf)
-    assert(jettyServer.getState === "STARTED")
+    val serverInfo = JettyUtils.startJettyServer(
+      "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
+    val server = serverInfo.server
+    val boundPort = serverInfo.boundPort
+    assert(server.getState === "STARTED")
     assert(boundPort != 0)
-    Try {new ServerSocket(boundPort)} match {
+    Try { new ServerSocket(boundPort) } match {
       case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
-      case Failure  (e) =>
+      case Failure(e) =>
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8ca863e..d8a3e85 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,11 +22,12 @@ import org.scalatest.FunSuite
 import org.apache.spark.{LocalSparkContext, SparkContext, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
+import org.apache.spark.util.Utils
 
 class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
-    val listener = new JobProgressListener(sc)
+    val listener = new JobProgressListener(sc.conf)
     val taskMetrics = new TaskMetrics()
     val shuffleReadMetrics = new ShuffleReadMetrics()
 
@@ -38,16 +39,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
     var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
     taskInfo.finishTime = 1
-    listener.onTaskEnd(new SparkListenerTaskEnd(
-      new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+    var task = new ShuffleMapTask(0, null, null, 0, null)
+    val taskType = Utils.getFormattedClassName(task)
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
     assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
       .shuffleRead == 1000)
 
     // finish a task with unknown executor-id, nothing should happen
     taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
     taskInfo.finishTime = 1
-    listener.onTaskEnd(new SparkListenerTaskEnd(
-      new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+    task = new ShuffleMapTask(0, null, null, 0, null)
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
     assert(listener.stageIdToExecutorSummaries.size == 1)
 
     // finish this task, should get updated duration
@@ -55,8 +57,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
     taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
     taskInfo.finishTime = 1
-    listener.onTaskEnd(new SparkListenerTaskEnd(
-      new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+    task = new ShuffleMapTask(0, null, null, 0, null)
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
     assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
       .shuffleRead == 2000)
 
@@ -65,8 +67,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
     taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
     taskInfo.finishTime = 1
-    listener.onTaskEnd(new SparkListenerTaskEnd(
-      new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+    task = new ShuffleMapTask(0, null, null, 0, null)
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
     assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
       .shuffleRead == 1000)
   }