You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:11 UTC
[2/6] [SPARK-1132] Persisting Web UI through refactoring the
SparkListener interface
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
new file mode 100644
index 0000000..346f2b7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.{Properties, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.Map
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST._
+
+import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.scheduler._
+import org.apache.spark.storage._
+import org.apache.spark._
+
+private[spark] object JsonProtocol {
+ private implicit val format = DefaultFormats
+
+ /** ------------------------------------------------- *
+ * JSON serialization methods for SparkListenerEvents |
+ * -------------------------------------------------- */
+
+ def sparkEventToJson(event: SparkListenerEvent): JValue = {
+ event match {
+ case stageSubmitted: SparkListenerStageSubmitted =>
+ stageSubmittedToJson(stageSubmitted)
+ case stageCompleted: SparkListenerStageCompleted =>
+ stageCompletedToJson(stageCompleted)
+ case taskStart: SparkListenerTaskStart =>
+ taskStartToJson(taskStart)
+ case taskGettingResult: SparkListenerTaskGettingResult =>
+ taskGettingResultToJson(taskGettingResult)
+ case taskEnd: SparkListenerTaskEnd =>
+ taskEndToJson(taskEnd)
+ case jobStart: SparkListenerJobStart =>
+ jobStartToJson(jobStart)
+ case jobEnd: SparkListenerJobEnd =>
+ jobEndToJson(jobEnd)
+ case environmentUpdate: SparkListenerEnvironmentUpdate =>
+ environmentUpdateToJson(environmentUpdate)
+ case blockManagerAdded: SparkListenerBlockManagerAdded =>
+ blockManagerAddedToJson(blockManagerAdded)
+ case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
+ blockManagerRemovedToJson(blockManagerRemoved)
+ case unpersistRDD: SparkListenerUnpersistRDD =>
+ unpersistRDDToJson(unpersistRDD)
+
+ // Not used, but keeps compiler happy
+ case SparkListenerShutdown => JNothing
+ }
+ }
+
+ def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
+ val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
+ val properties = propertiesToJson(stageSubmitted.properties)
+ ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+ ("Stage Info" -> stageInfo) ~
+ ("Properties" -> properties)
+ }
+
+ def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
+ val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
+ ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+ ("Stage Info" -> stageInfo)
+ }
+
+ def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
+ val taskInfo = taskStart.taskInfo
+ val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+ ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+ ("Stage ID" -> taskStart.stageId) ~
+ ("Task Info" -> taskInfoJson)
+ }
+
+ def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
+ val taskInfo = taskGettingResult.taskInfo
+ val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+ ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+ ("Task Info" -> taskInfoJson)
+ }
+
+ def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
+ val taskEndReason = taskEndReasonToJson(taskEnd.reason)
+ val taskInfo = taskEnd.taskInfo
+ val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
+ val taskMetrics = taskEnd.taskMetrics
+ val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
+ ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+ ("Stage ID" -> taskEnd.stageId) ~
+ ("Task Type" -> taskEnd.taskType) ~
+ ("Task End Reason" -> taskEndReason) ~
+ ("Task Info" -> taskInfoJson) ~
+ ("Task Metrics" -> taskMetricsJson)
+ }
+
+ def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
+ val properties = propertiesToJson(jobStart.properties)
+ ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+ ("Job ID" -> jobStart.jobId) ~
+ ("Stage IDs" -> jobStart.stageIds) ~
+ ("Properties" -> properties)
+ }
+
+ def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
+ val jobResult = jobResultToJson(jobEnd.jobResult)
+ ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+ ("Job ID" -> jobEnd.jobId) ~
+ ("Job Result" -> jobResult)
+ }
+
+ def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = {
+ val environmentDetails = environmentUpdate.environmentDetails
+ val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
+ val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
+ val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
+ val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
+ ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+ ("JVM Information" -> jvmInformation) ~
+ ("Spark Properties" -> sparkProperties) ~
+ ("System Properties" -> systemProperties) ~
+ ("Classpath Entries" -> classpathEntries)
+ }
+
+ def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
+ val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
+ ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+ ("Block Manager ID" -> blockManagerId) ~
+ ("Maximum Memory" -> blockManagerAdded.maxMem)
+ }
+
+ def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
+ val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
+ ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+ ("Block Manager ID" -> blockManagerId)
+ }
+
+ def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
+ ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+ ("RDD ID" -> unpersistRDD.rddId)
+ }
+
+
+ /** ------------------------------------------------------------------- *
+ * JSON serialization methods for classes SparkListenerEvents depend on |
+ * -------------------------------------------------------------------- */
+
+ def stageInfoToJson(stageInfo: StageInfo): JValue = {
+ val rddInfo = rddInfoToJson(stageInfo.rddInfo)
+ val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
+ val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
+ ("Stage ID" -> stageInfo.stageId) ~
+ ("Stage Name" -> stageInfo.name) ~
+ ("Number of Tasks" -> stageInfo.numTasks) ~
+ ("RDD Info" -> rddInfo) ~
+ ("Submission Time" -> submissionTime) ~
+ ("Completion Time" -> completionTime) ~
+ ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
+ }
+
+ def taskInfoToJson(taskInfo: TaskInfo): JValue = {
+ ("Task ID" -> taskInfo.taskId) ~
+ ("Index" -> taskInfo.index) ~
+ ("Launch Time" -> taskInfo.launchTime) ~
+ ("Executor ID" -> taskInfo.executorId) ~
+ ("Host" -> taskInfo.host) ~
+ ("Locality" -> taskInfo.taskLocality.toString) ~
+ ("Getting Result Time" -> taskInfo.gettingResultTime) ~
+ ("Finish Time" -> taskInfo.finishTime) ~
+ ("Failed" -> taskInfo.failed) ~
+ ("Serialized Size" -> taskInfo.serializedSize)
+ }
+
+ def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
+ val shuffleReadMetrics =
+ taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
+ val shuffleWriteMetrics =
+ taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+ val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
+ JArray(blocks.toList.map { case (id, status) =>
+ ("Block ID" -> blockIdToJson(id)) ~
+ ("Status" -> blockStatusToJson(status))
+ })
+ }.getOrElse(JNothing)
+ ("Host Name" -> taskMetrics.hostname) ~
+ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
+ ("Executor Run Time" -> taskMetrics.executorRunTime) ~
+ ("Result Size" -> taskMetrics.resultSize) ~
+ ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
+ ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
+ ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
+ ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
+ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
+ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+ ("Updated Blocks" -> updatedBlocks)
+ }
+
+ def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
+ ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
+ ("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
+ ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
+ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
+ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
+ ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead)
+ }
+
+ def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
+ ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
+ ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
+ }
+
+ def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
+ val reason = Utils.getFormattedClassName(taskEndReason)
+ val json = taskEndReason match {
+ case fetchFailed: FetchFailed =>
+ val blockManagerAddress = blockManagerIdToJson(fetchFailed.bmAddress)
+ ("Block Manager Address" -> blockManagerAddress) ~
+ ("Shuffle ID" -> fetchFailed.shuffleId) ~
+ ("Map ID" -> fetchFailed.mapId) ~
+ ("Reduce ID" -> fetchFailed.reduceId)
+ case exceptionFailure: ExceptionFailure =>
+ val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
+ val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
+ ("Class Name" -> exceptionFailure.className) ~
+ ("Description" -> exceptionFailure.description) ~
+ ("Stack Trace" -> stackTrace) ~
+ ("Metrics" -> metrics)
+ case _ => Utils.emptyJson
+ }
+ ("Reason" -> reason) ~ json
+ }
+
+ def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
+ ("Executor ID" -> blockManagerId.executorId) ~
+ ("Host" -> blockManagerId.host) ~
+ ("Port" -> blockManagerId.port) ~
+ ("Netty Port" -> blockManagerId.nettyPort)
+ }
+
+ def jobResultToJson(jobResult: JobResult): JValue = {
+ val result = Utils.getFormattedClassName(jobResult)
+ val json = jobResult match {
+ case JobSucceeded => Utils.emptyJson
+ case jobFailed: JobFailed =>
+ val exception = exceptionToJson(jobFailed.exception)
+ ("Exception" -> exception) ~
+ ("Failed Stage ID" -> jobFailed.failedStageId)
+ }
+ ("Result" -> result) ~ json
+ }
+
+ def rddInfoToJson(rddInfo: RDDInfo): JValue = {
+ val storageLevel = storageLevelToJson(rddInfo.storageLevel)
+ ("RDD ID" -> rddInfo.id) ~
+ ("Name" -> rddInfo.name) ~
+ ("Storage Level" -> storageLevel) ~
+ ("Number of Partitions" -> rddInfo.numPartitions) ~
+ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
+ ("Memory Size" -> rddInfo.memSize) ~
+ ("Disk Size" -> rddInfo.diskSize)
+ }
+
+ def storageLevelToJson(storageLevel: StorageLevel): JValue = {
+ ("Use Disk" -> storageLevel.useDisk) ~
+ ("Use Memory" -> storageLevel.useMemory) ~
+ ("Deserialized" -> storageLevel.deserialized) ~
+ ("Replication" -> storageLevel.replication)
+ }
+
+ def blockIdToJson(blockId: BlockId): JValue = {
+ val blockType = Utils.getFormattedClassName(blockId)
+ val json: JObject = blockId match {
+ case rddBlockId: RDDBlockId =>
+ ("RDD ID" -> rddBlockId.rddId) ~
+ ("Split Index" -> rddBlockId.splitIndex)
+ case shuffleBlockId: ShuffleBlockId =>
+ ("Shuffle ID" -> shuffleBlockId.shuffleId) ~
+ ("Map ID" -> shuffleBlockId.mapId) ~
+ ("Reduce ID" -> shuffleBlockId.reduceId)
+ case broadcastBlockId: BroadcastBlockId =>
+ "Broadcast ID" -> broadcastBlockId.broadcastId
+ case broadcastHelperBlockId: BroadcastHelperBlockId =>
+ ("Broadcast Block ID" -> blockIdToJson(broadcastHelperBlockId.broadcastId)) ~
+ ("Helper Type" -> broadcastHelperBlockId.hType)
+ case taskResultBlockId: TaskResultBlockId =>
+ "Task ID" -> taskResultBlockId.taskId
+ case streamBlockId: StreamBlockId =>
+ ("Stream ID" -> streamBlockId.streamId) ~
+ ("Unique ID" -> streamBlockId.uniqueId)
+ case tempBlockId: TempBlockId =>
+ val uuid = UUIDToJson(tempBlockId.id)
+ "Temp ID" -> uuid
+ case testBlockId: TestBlockId =>
+ "Test ID" -> testBlockId.id
+ }
+ ("Type" -> blockType) ~ json
+ }
+
+ def blockStatusToJson(blockStatus: BlockStatus): JValue = {
+ val storageLevel = storageLevelToJson(blockStatus.storageLevel)
+ ("Storage Level" -> storageLevel) ~
+ ("Memory Size" -> blockStatus.memSize) ~
+ ("Disk Size" -> blockStatus.diskSize)
+ }
+
+
+ /** ------------------------------ *
+ * Util JSON serialization methods |
+ * ------------------------------- */
+
+ def mapToJson(m: Map[String, String]): JValue = {
+ val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
+ JObject(jsonFields.toList)
+ }
+
+ def propertiesToJson(properties: Properties): JValue = {
+ Option(properties).map { p =>
+ mapToJson(p.asScala)
+ }.getOrElse(JNothing)
+ }
+
+ def UUIDToJson(id: UUID): JValue = {
+ ("Least Significant Bits" -> id.getLeastSignificantBits) ~
+ ("Most Significant Bits" -> id.getMostSignificantBits)
+ }
+
+ def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
+ JArray(stackTrace.map { case line =>
+ ("Declaring Class" -> line.getClassName) ~
+ ("Method Name" -> line.getMethodName) ~
+ ("File Name" -> line.getFileName) ~
+ ("Line Number" -> line.getLineNumber)
+ }.toList)
+ }
+
+ def exceptionToJson(exception: Exception): JValue = {
+ ("Message" -> exception.getMessage) ~
+ ("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
+ }
+
+
+ /** --------------------------------------------------- *
+ * JSON deserialization methods for SparkListenerEvents |
+ * ---------------------------------------------------- */
+
+ def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+ val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
+ val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
+ val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
+ val taskGettingResult = Utils.getFormattedClassName(SparkListenerTaskGettingResult)
+ val taskEnd = Utils.getFormattedClassName(SparkListenerTaskEnd)
+ val jobStart = Utils.getFormattedClassName(SparkListenerJobStart)
+ val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd)
+ val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
+ val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded)
+ val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved)
+ val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
+
+ (json \ "Event").extract[String] match {
+ case `stageSubmitted` => stageSubmittedFromJson(json)
+ case `stageCompleted` => stageCompletedFromJson(json)
+ case `taskStart` => taskStartFromJson(json)
+ case `taskGettingResult` => taskGettingResultFromJson(json)
+ case `taskEnd` => taskEndFromJson(json)
+ case `jobStart` => jobStartFromJson(json)
+ case `jobEnd` => jobEndFromJson(json)
+ case `environmentUpdate` => environmentUpdateFromJson(json)
+ case `blockManagerAdded` => blockManagerAddedFromJson(json)
+ case `blockManagerRemoved` => blockManagerRemovedFromJson(json)
+ case `unpersistRDD` => unpersistRDDFromJson(json)
+ }
+ }
+
+ def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
+ val stageInfo = stageInfoFromJson(json \ "Stage Info")
+ val properties = propertiesFromJson(json \ "Properties")
+ SparkListenerStageSubmitted(stageInfo, properties)
+ }
+
+ def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
+ val stageInfo = stageInfoFromJson(json \ "Stage Info")
+ SparkListenerStageCompleted(stageInfo)
+ }
+
+ def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
+ val stageId = (json \ "Stage ID").extract[Int]
+ val taskInfo = taskInfoFromJson(json \ "Task Info")
+ SparkListenerTaskStart(stageId, taskInfo)
+ }
+
+ def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
+ val taskInfo = taskInfoFromJson(json \ "Task Info")
+ SparkListenerTaskGettingResult(taskInfo)
+ }
+
+ def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
+ val stageId = (json \ "Stage ID").extract[Int]
+ val taskType = (json \ "Task Type").extract[String]
+ val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
+ val taskInfo = taskInfoFromJson(json \ "Task Info")
+ val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+ SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics)
+ }
+
+ def jobStartFromJson(json: JValue): SparkListenerJobStart = {
+ val jobId = (json \ "Job ID").extract[Int]
+ val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
+ val properties = propertiesFromJson(json \ "Properties")
+ SparkListenerJobStart(jobId, stageIds, properties)
+ }
+
+ def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
+ val jobId = (json \ "Job ID").extract[Int]
+ val jobResult = jobResultFromJson(json \ "Job Result")
+ SparkListenerJobEnd(jobId, jobResult)
+ }
+
+ def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
+ val environmentDetails = Map[String, Seq[(String, String)]](
+ "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
+ "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+ "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+ "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+ SparkListenerEnvironmentUpdate(environmentDetails)
+ }
+
+ def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
+ val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+ val maxMem = (json \ "Maximum Memory").extract[Long]
+ SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+ }
+
+ def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
+ val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+ SparkListenerBlockManagerRemoved(blockManagerId)
+ }
+
+ def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
+ SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+ }
+
+
+ /** --------------------------------------------------------------------- *
+ * JSON deserialization methods for classes SparkListenerEvents depend on |
+ * ---------------------------------------------------------------------- */
+
+ def stageInfoFromJson(json: JValue): StageInfo = {
+ val stageId = (json \ "Stage ID").extract[Int]
+ val stageName = (json \ "Stage Name").extract[String]
+ val numTasks = (json \ "Number of Tasks").extract[Int]
+ val rddInfo = rddInfoFromJson(json \ "RDD Info")
+ val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
+ val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
+ val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
+
+ val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
+ stageInfo.submissionTime = submissionTime
+ stageInfo.completionTime = completionTime
+ stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
+ stageInfo
+ }
+
+ def taskInfoFromJson(json: JValue): TaskInfo = {
+ val taskId = (json \ "Task ID").extract[Long]
+ val index = (json \ "Index").extract[Int]
+ val launchTime = (json \ "Launch Time").extract[Long]
+ val executorId = (json \ "Executor ID").extract[String]
+ val host = (json \ "Host").extract[String]
+ val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
+ val gettingResultTime = (json \ "Getting Result Time").extract[Long]
+ val finishTime = (json \ "Finish Time").extract[Long]
+ val failed = (json \ "Failed").extract[Boolean]
+ val serializedSize = (json \ "Serialized Size").extract[Int]
+
+ val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
+ taskInfo.gettingResultTime = gettingResultTime
+ taskInfo.finishTime = finishTime
+ taskInfo.failed = failed
+ taskInfo.serializedSize = serializedSize
+ taskInfo
+ }
+
+ def taskMetricsFromJson(json: JValue): TaskMetrics = {
+ val metrics = new TaskMetrics
+ metrics.hostname = (json \ "Host Name").extract[String]
+ metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
+ metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
+ metrics.resultSize = (json \ "Result Size").extract[Long]
+ metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
+ metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
+ metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
+ metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
+ metrics.shuffleReadMetrics =
+ Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
+ metrics.shuffleWriteMetrics =
+ Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+ metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
+ value.extract[List[JValue]].map { block =>
+ val id = blockIdFromJson(block \ "Block ID")
+ val status = blockStatusFromJson(block \ "Status")
+ (id, status)
+ }
+ }
+ metrics
+ }
+
+ def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
+ val metrics = new ShuffleReadMetrics
+ metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
+ metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
+ metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
+ metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
+ metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
+ metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
+ metrics
+ }
+
+ def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
+ val metrics = new ShuffleWriteMetrics
+ metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
+ metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
+ metrics
+ }
+
+ def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+ val success = Utils.getFormattedClassName(Success)
+ val resubmitted = Utils.getFormattedClassName(Resubmitted)
+ val fetchFailed = Utils.getFormattedClassName(FetchFailed)
+ val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
+ val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
+ val taskKilled = Utils.getFormattedClassName(TaskKilled)
+ val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
+ val unknownReason = Utils.getFormattedClassName(UnknownReason)
+
+ (json \ "Reason").extract[String] match {
+ case `success` => Success
+ case `resubmitted` => Resubmitted
+ case `fetchFailed` =>
+ val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
+ val shuffleId = (json \ "Shuffle ID").extract[Int]
+ val mapId = (json \ "Map ID").extract[Int]
+ val reduceId = (json \ "Reduce ID").extract[Int]
+ new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
+ case `exceptionFailure` =>
+ val className = (json \ "Class Name").extract[String]
+ val description = (json \ "Description").extract[String]
+ val stackTrace = stackTraceFromJson(json \ "Stack Trace")
+ val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
+ new ExceptionFailure(className, description, stackTrace, metrics)
+ case `taskResultLost` => TaskResultLost
+ case `taskKilled` => TaskKilled
+ case `executorLostFailure` => ExecutorLostFailure
+ case `unknownReason` => UnknownReason
+ }
+ }
+
+ def blockManagerIdFromJson(json: JValue): BlockManagerId = {
+ val executorId = (json \ "Executor ID").extract[String]
+ val host = (json \ "Host").extract[String]
+ val port = (json \ "Port").extract[Int]
+ val nettyPort = (json \ "Netty Port").extract[Int]
+ BlockManagerId(executorId, host, port, nettyPort)
+ }
+
+ def jobResultFromJson(json: JValue): JobResult = {
+ val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
+ val jobFailed = Utils.getFormattedClassName(JobFailed)
+
+ (json \ "Result").extract[String] match {
+ case `jobSucceeded` => JobSucceeded
+ case `jobFailed` =>
+ val exception = exceptionFromJson(json \ "Exception")
+ val failedStageId = (json \ "Failed Stage ID").extract[Int]
+ new JobFailed(exception, failedStageId)
+ }
+ }
+
+ def rddInfoFromJson(json: JValue): RDDInfo = {
+ val rddId = (json \ "RDD ID").extract[Int]
+ val name = (json \ "Name").extract[String]
+ val storageLevel = storageLevelFromJson(json \ "Storage Level")
+ val numPartitions = (json \ "Number of Partitions").extract[Int]
+ val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
+ val memSize = (json \ "Memory Size").extract[Long]
+ val diskSize = (json \ "Disk Size").extract[Long]
+
+ val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
+ rddInfo.numCachedPartitions = numCachedPartitions
+ rddInfo.memSize = memSize
+ rddInfo.diskSize = diskSize
+ rddInfo
+ }
+
+ def storageLevelFromJson(json: JValue): StorageLevel = {
+ val useDisk = (json \ "Use Disk").extract[Boolean]
+ val useMemory = (json \ "Use Memory").extract[Boolean]
+ val deserialized = (json \ "Deserialized").extract[Boolean]
+ val replication = (json \ "Replication").extract[Int]
+ StorageLevel(useDisk, useMemory, deserialized, replication)
+ }
+
+ def blockIdFromJson(json: JValue): BlockId = {
+ val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
+ val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
+ val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
+ val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId)
+ val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
+ val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
+ val tempBlockId = Utils.getFormattedClassName(TempBlockId)
+ val testBlockId = Utils.getFormattedClassName(TestBlockId)
+
+ (json \ "Type").extract[String] match {
+ case `rddBlockId` =>
+ val rddId = (json \ "RDD ID").extract[Int]
+ val splitIndex = (json \ "Split Index").extract[Int]
+ new RDDBlockId(rddId, splitIndex)
+ case `shuffleBlockId` =>
+ val shuffleId = (json \ "Shuffle ID").extract[Int]
+ val mapId = (json \ "Map ID").extract[Int]
+ val reduceId = (json \ "Reduce ID").extract[Int]
+ new ShuffleBlockId(shuffleId, mapId, reduceId)
+ case `broadcastBlockId` =>
+ val broadcastId = (json \ "Broadcast ID").extract[Long]
+ new BroadcastBlockId(broadcastId)
+ case `broadcastHelperBlockId` =>
+ val broadcastBlockId =
+ blockIdFromJson(json \ "Broadcast Block ID").asInstanceOf[BroadcastBlockId]
+ val hType = (json \ "Helper Type").extract[String]
+ new BroadcastHelperBlockId(broadcastBlockId, hType)
+ case `taskResultBlockId` =>
+ val taskId = (json \ "Task ID").extract[Long]
+ new TaskResultBlockId(taskId)
+ case `streamBlockId` =>
+ val streamId = (json \ "Stream ID").extract[Int]
+ val uniqueId = (json \ "Unique ID").extract[Long]
+ new StreamBlockId(streamId, uniqueId)
+ case `tempBlockId` =>
+ val tempId = UUIDFromJson(json \ "Temp ID")
+ new TempBlockId(tempId)
+ case `testBlockId` =>
+ val testId = (json \ "Test ID").extract[String]
+ new TestBlockId(testId)
+ }
+ }
+
+ def blockStatusFromJson(json: JValue): BlockStatus = {
+ val storageLevel = storageLevelFromJson(json \ "Storage Level")
+ val memorySize = (json \ "Memory Size").extract[Long]
+ val diskSize = (json \ "Disk Size").extract[Long]
+ BlockStatus(storageLevel, memorySize, diskSize)
+ }
+
+
+ /** -------------------------------- *
+ * Util JSON deserialization methods |
+ * --------------------------------- */
+
+ def mapFromJson(json: JValue): Map[String, String] = {
+ val jsonFields = json.asInstanceOf[JObject].obj
+ jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
+ }
+
+ def propertiesFromJson(json: JValue): Properties = {
+ val properties = new Properties()
+ if (json != JNothing) {
+ mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
+ }
+ properties
+ }
+
+ def UUIDFromJson(json: JValue): UUID = {
+ val leastSignificantBits = (json \ "Least Significant Bits").extract[Long]
+ val mostSignificantBits = (json \ "Most Significant Bits").extract[Long]
+ new UUID(leastSignificantBits, mostSignificantBits)
+ }
+
+ def stackTraceFromJson(json: JValue): Array[StackTraceElement] = {
+ json.extract[List[JValue]].map { line =>
+ val declaringClass = (line \ "Declaring Class").extract[String]
+ val methodName = (line \ "Method Name").extract[String]
+ val fileName = (line \ "File Name").extract[String]
+ val lineNumber = (line \ "Line Number").extract[Int]
+ new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
+ }.toArray
+ }
+
+ def exceptionFromJson(json: JValue): Exception = {
+ val e = new Exception((json \ "Message").extract[String])
+ e.setStackTrace(stackTraceFromJson(json \ "Stack Trace"))
+ e
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 38a275d..13d9dbd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -32,11 +32,11 @@ import scala.reflect.ClassTag
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.json4s._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
-
+import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/**
* Various utility methods used by Spark.
@@ -245,7 +245,7 @@ private[spark] object Utils extends Logging {
val userCred = securityMgr.getSecretKey()
if (userCred == null) throw new Exception("Secret key is null with authentication on")
val userInfo = securityMgr.getHttpUser() + ":" + userCred
- new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(),
+ new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment())
}
@@ -282,7 +282,7 @@ private[spark] object Utils extends Logging {
uc.setConnectTimeout(timeout)
uc.setReadTimeout(timeout)
uc.connect()
- val in = uc.getInputStream();
+ val in = uc.getInputStream()
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
@@ -328,8 +328,7 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val conf = SparkHadoopUtil.get.newConfiguration()
- val fs = FileSystem.get(uri, conf)
+ val fs = getHadoopFileSystem(uri)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
@@ -500,7 +499,7 @@ private[spark] object Utils extends Logging {
* millisecond.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
- return " " + (System.currentTimeMillis - startTimeMs) + " ms"
+ " " + (System.currentTimeMillis - startTimeMs) + " ms"
}
/**
@@ -789,7 +788,7 @@ private[spark] object Utils extends Logging {
}
var i = 0
while (i < s.length) {
- var nextChar = s.charAt(i)
+ val nextChar = s.charAt(i)
if (inDoubleQuote) {
if (nextChar == '"') {
inDoubleQuote = false
@@ -895,4 +894,27 @@ private[spark] object Utils extends Logging {
}
count
}
+
+ /** Return the class name of the given object, removing all dollar signs */
+ def getFormattedClassName(obj: AnyRef) = {
+ obj.getClass.getSimpleName.replace("$", "")
+ }
+
+ /** Return an option that translates JNothing to None */
+ def jsonOption(json: JValue): Option[JValue] = {
+ json match {
+ case JNothing => None
+ case value: JValue => Some(value)
+ }
+ }
+
+ /** Return an empty JSON object */
+ def emptyJson = JObject(List[JField]())
+
+ /**
+ * Return a Hadoop FileSystem with the scheme encoded in the given path.
+ */
+ def getHadoopFileSystem(path: URI): FileSystem = {
+ FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 40e853c..c6b65c7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -43,6 +43,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
+import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.storage.StorageLevel;
@@ -402,16 +403,16 @@ public class JavaAPISuite implements Serializable {
@Test
public void javaDoubleRDDHistoGram() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- // Test using generated buckets
- Tuple2<double[], long[]> results = rdd.histogram(2);
- double[] expected_buckets = {1.0, 2.5, 4.0};
- long[] expected_counts = {2, 2};
- Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
- Assert.assertArrayEquals(expected_counts, results._2);
- // Test with provided buckets
- long[] histogram = rdd.histogram(expected_buckets);
- Assert.assertArrayEquals(expected_counts, histogram);
+ JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+ // Test using generated buckets
+ Tuple2<double[], long[]> results = rdd.histogram(2);
+ double[] expected_buckets = {1.0, 2.5, 4.0};
+ long[] expected_counts = {2, 2};
+ Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
+ Assert.assertArrayEquals(expected_counts, results._2);
+ // Test with provided buckets
+ long[] histogram = rdd.histogram(expected_buckets);
+ Assert.assertArrayEquals(expected_counts, histogram);
}
@Test
@@ -570,7 +571,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContext(0, 0, 0, false, false, null);
+ TaskContext context = new TaskContext(0, 0, 0, false, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index ea936e8..b86923f 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -23,7 +23,8 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage._
// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
@@ -54,12 +55,12 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
expecting {
blockManager.get(RDDBlockId(0, 0)).andReturn(None)
blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
- true).andReturn(0)
+ true).andStubReturn(Seq[(BlockId, BlockStatus)]())
}
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
- taskMetrics = null)
+ taskMetrics = TaskMetrics.empty())
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
@@ -72,7 +73,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
- taskMetrics = null)
+ taskMetrics = TaskMetrics.empty())
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
@@ -86,7 +87,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false,
- taskMetrics = null)
+ taskMetrics = TaskMetrics.empty())
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 20c503d..7a39d1a 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
/**
* Test suite for cancelling running jobs. We run the cancellation tasks for single job action
@@ -89,7 +89,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
- sc.dagScheduler.addSparkListener(new SparkListener {
+ sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
@@ -161,7 +161,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
{
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
- sc.dagScheduler.addSparkListener(new SparkListener {
+ sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
@@ -191,7 +191,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
{
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
- sc.dagScheduler.addSparkListener(new SparkListener {
+ sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 0bac78d..6e7fd55 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -27,8 +27,11 @@ import org.apache.hadoop.fs.Path
import scala.collection.Map
import scala.sys.process._
import scala.util.Try
+
import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.spark.executor.TaskMetrics
+
class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("basic pipe") {
@@ -151,7 +154,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
- taskMetrics = null)
+ taskMetrics = TaskMetrics.empty())
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 3bb9367..b543471 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import org.scalatest.{FunSuite, PrivateMethodTester}
-import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
+import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
@@ -32,7 +32,7 @@ class SparkContextSchedulerCreationSuite
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test")
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
- val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+ val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bae3b37..9f2924c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -20,12 +20,9 @@ package org.apache.spark.deploy
import java.io.File
import java.util.Date
+import com.fasterxml.jackson.core.JsonParseException
import org.json4s._
-
-import org.json4s.JValue
import org.json4s.jackson.JsonMethods
-import com.fasterxml.jackson.core.JsonParseException
-
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
@@ -96,7 +93,7 @@ class JsonProtocolSuite extends FunSuite {
def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
- "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue)
+ "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}
@@ -148,12 +145,12 @@ object JsonConstants {
val submitDate = new Date(123456789)
val appInfoJsonStr =
"""
- |{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr",
+ |{"starttime":3,"id":"id","name":"name",
|"cores":4,"user":"%s",
|"memoryperslave":1234,"submitdate":"%s",
|"state":"WAITING","duration":%d}
""".format(System.getProperty("user.name", "<unknown>"),
- submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin
+ submitDate.toString, currTimeInMillis - appInfoStartTime).stripMargin
val workerInfoJsonStr =
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ad890b4..c97543f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -94,7 +94,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTrackerMaster(conf)
- scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
+ scheduler = new DAGScheduler(
+ taskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
@@ -422,15 +427,15 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
private def assertDataStructuresEmpty = {
assert(scheduler.pendingTasks.isEmpty)
assert(scheduler.activeJobs.isEmpty)
- assert(scheduler.failed.isEmpty)
- assert(scheduler.idToActiveJob.isEmpty)
+ assert(scheduler.failedStages.isEmpty)
+ assert(scheduler.stageIdToActiveJob.isEmpty)
assert(scheduler.jobIdToStageIds.isEmpty)
assert(scheduler.stageIdToJobIds.isEmpty)
assert(scheduler.stageIdToStage.isEmpty)
assert(scheduler.stageToInfos.isEmpty)
assert(scheduler.resultStageToJob.isEmpty)
- assert(scheduler.running.isEmpty)
+ assert(scheduler.runningStages.isEmpty)
assert(scheduler.shuffleToMapStage.isEmpty)
- assert(scheduler.waiting.isEmpty)
+ assert(scheduler.waitingStages.isEmpty)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
deleted file mode 100644
index 25fe63c..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
- val WAIT_TIMEOUT_MILLIS = 10000
-
- test("inner method") {
- sc = new SparkContext("local", "joblogger")
- val joblogger = new JobLogger {
- def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
- def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
- def getRddNameTest(rdd: RDD[_]) = getRddName(rdd)
- def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
- }
- type MyRDD = RDD[(Int, Int)]
- def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
- val maxPartition = numPartitions - 1
- new MyRDD(sc, dependencies) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new RuntimeException("should not be reached")
- override def getPartitions = (0 to maxPartition).map(i => new Partition {
- override def index = i
- }).toArray
- }
- }
- val jobID = 5
- val parentRdd = makeRdd(4, Nil)
- val shuffleDep = new ShuffleDependency(parentRdd, null)
- val rootRdd = makeRdd(4, List(shuffleDep))
- val shuffleMapStage =
- new Stage(1, parentRdd, parentRdd.partitions.size, Some(shuffleDep), Nil, jobID, None)
- val rootStage =
- new Stage(0, rootRdd, rootRdd.partitions.size, None, List(shuffleMapStage), jobID, None)
- val rootStageInfo = new StageInfo(rootStage)
-
- joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
- joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
- parentRdd.setName("MyRDD")
- joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
- joblogger.createLogWriterTest(jobID)
- joblogger.getJobIDtoPrintWriter.size should be (1)
- joblogger.buildJobDepTest(jobID, rootStage)
- joblogger.getJobIDToStages.get(jobID).get.size should be (2)
- joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
- joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
- joblogger.closeLogWriterTest(jobID)
- joblogger.getStageIDToJobID.size should be (0)
- joblogger.getJobIDToStages.size should be (0)
- joblogger.getJobIDtoPrintWriter.size should be (0)
- }
-
- test("inner variables") {
- sc = new SparkContext("local[4]", "joblogger")
- val joblogger = new JobLogger {
- override protected def closeLogWriter(jobID: Int) =
- getJobIDtoPrintWriter.get(jobID).foreach { fileWriter =>
- fileWriter.close()
- }
- }
- sc.addSparkListener(joblogger)
- val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
- rdd.reduceByKey(_+_).collect()
-
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
-
- val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
-
- joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
- joblogger.getJobIDtoPrintWriter.size should be (1)
- joblogger.getStageIDToJobID.size should be (2)
- joblogger.getStageIDToJobID.get(0) should be (Some(0))
- joblogger.getStageIDToJobID.get(1) should be (Some(0))
- joblogger.getJobIDToStages.size should be (1)
- }
-
-
- test("interface functions") {
- sc = new SparkContext("local[4]", "joblogger")
- val joblogger = new JobLogger {
- var onTaskEndCount = 0
- var onJobEndCount = 0
- var onJobStartCount = 0
- var onStageCompletedCount = 0
- var onStageSubmittedCount = 0
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
- override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
- override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
- }
- sc.addSparkListener(joblogger)
- val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
- rdd.reduceByKey(_+_).collect()
-
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
-
- joblogger.onJobStartCount should be (1)
- joblogger.onJobEndCount should be (1)
- joblogger.onTaskEndCount should be (8)
- joblogger.onStageSubmittedCount should be (2)
- joblogger.onStageCompletedCount should be (2)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 7c4f2b4..a25ce35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.{Buffer, HashSet}
+import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.SparkContext._
+import org.apache.spark.executor.TaskMetrics
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
with BeforeAndAfter with BeforeAndAfterAll {
@@ -38,43 +39,76 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
System.clearProperty("spark.akka.frameSize")
}
+ test("basic creation and shutdown of LiveListenerBus") {
+ val counter = new BasicJobCounter
+ val bus = new LiveListenerBus
+ bus.addListener(counter)
+
+ // Listener bus hasn't started yet, so posting events should not increment counter
+ (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+ assert(counter.count === 0)
+
+ // Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
+ bus.start()
+ Thread.sleep(1000)
+ assert(counter.count === 5)
+
+ // After listener bus has stopped, posting events should not increment counter
+ bus.stop()
+ (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+ assert(counter.count === 5)
+
+ // Listener bus must not be started twice
+ intercept[IllegalStateException] {
+ val bus = new LiveListenerBus
+ bus.start()
+ bus.start()
+ }
+
+ // ... or stopped before starting
+ intercept[IllegalStateException] {
+ val bus = new LiveListenerBus
+ bus.stop()
+ }
+ }
+
test("basic creation of StageInfo") {
- val listener = new SaveStageInfo
+ val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
- val rdd2 = rdd1.map(x => x.toString)
+ val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
- val first = listener.stageInfos.head
- first.rddName should be {"Target RDD"}
- first.numTasks should be {4}
- first.numPartitions should be {4}
- first.submissionTime should be ('defined)
- first.completionTime should be ('defined)
- first.taskInfos.length should be {4}
+ val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
+ stageInfo.rddInfo.name should be {"Target RDD"}
+ stageInfo.numTasks should be {4}
+ stageInfo.rddInfo.numPartitions should be {4}
+ stageInfo.submissionTime should be ('defined)
+ stageInfo.completionTime should be ('defined)
+ taskInfoMetrics.length should be {4}
}
test("StageInfo with fewer tasks than partitions") {
- val listener = new SaveStageInfo
+ val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
- val rdd2 = rdd1.map(x => x.toString)
+ val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
- val first = listener.stageInfos.head
- first.numTasks should be {2}
- first.numPartitions should be {4}
+ val (stageInfo, _) = listener.stageInfos.head
+ stageInfo.numTasks should be {2}
+ stageInfo.rddInfo.numPartitions should be {4}
}
test("local metrics") {
- val listener = new SaveStageInfo
+ val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
//just to make sure some of the tasks take a noticeable amount of time
@@ -84,45 +118,45 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
i
}
- val d = sc.parallelize(0 to 1e4.toInt, 64).map{i => w(i)}
+ val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
d.count()
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)
- val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
-
- val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2")
-
- val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)}
+ val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
+ val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
+ val d4 = d2.cogroup(d3, 64).map { case (k, (v1, v2)) =>
+ w(k) -> (v1.size, v2.size)
+ }
d4.setName("A Cogroup")
-
d4.collectAsMap()
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
- listener.stageInfos.foreach { stageInfo =>
- /* small test, so some tasks might take less than 1 millisecond, but average should be greater
- * than 0 ms. */
- checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
+ listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
+ /**
+ * Small test, so some tasks might take less than 1 millisecond, but average should be greater
+ * than 0 ms.
+ */
checkNonZeroAvg(
- stageInfo.taskInfos.map{_._2.executorRunTime.toLong},
+ taskInfoMetrics.map(_._2.executorRunTime),
stageInfo + " executorRunTime")
checkNonZeroAvg(
- stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong},
+ taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")
- if (stageInfo.rddName == d4.name) {
+ if (stageInfo.rddInfo.name == d4.name) {
checkNonZeroAvg(
- stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
+ taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
- stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) =>
+ taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
- if (stageInfo.rddName == d2.name || stageInfo.rddName == d3.name) {
+ if (stageInfo.rddInfo.name == d2.name || stageInfo.rddInfo.name == d3.name) {
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
- if (stageInfo.rddName == d4.name) {
+ if (stageInfo.rddInfo.name == d4.name) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
@@ -142,10 +176,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
+ val result = sc.parallelize(Seq(1), 1)
+ .map { x => 1.to(akkaFrameSize).toArray }
+ .reduce { case (x, y) => x }
assert(result === 1.to(akkaFrameSize).toArray)
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
@@ -157,13 +193,13 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sc.addSparkListener(listener)
// Make a task whose result is larger than the akka frame size
- val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+ val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
- assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
- assert(listener.startedGettingResultTasks.isEmpty == true)
+ assert(listener.startedGettingResultTasks.isEmpty)
assert(listener.endedTasks.contains(TASK_INDEX))
}
@@ -204,17 +240,33 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
assert(m.sum / m.size.toDouble > 0.0, msg)
}
- class SaveStageInfo extends SparkListener {
- val stageInfos = Buffer[StageInfo]()
+ class BasicJobCounter extends SparkListener {
+ var count = 0
+ override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+ }
+
+ class SaveStageAndTaskInfo extends SparkListener {
+ val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
+ var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+ override def onTaskEnd(task: SparkListenerTaskEnd) {
+ val info = task.taskInfo
+ val metrics = task.taskMetrics
+ if (info != null && metrics != null) {
+ taskInfoMetrics += ((info, metrics))
+ }
+ }
+
override def onStageCompleted(stage: SparkListenerStageCompleted) {
- stageInfos += stage.stage
+ stageInfos(stage.stageInfo) = taskInfoMetrics
+ taskInfoMetrics = mutable.Buffer.empty
}
}
class SaveTaskEvents extends SparkListener {
- val startedTasks = new HashSet[Int]()
- val startedGettingResultTasks = new HashSet[Int]()
- val endedTasks = new HashSet[Int]()
+ val startedTasks = new mutable.HashSet[Int]()
+ val startedGettingResultTasks = new mutable.HashSet[Int]()
+ val endedTasks = new mutable.HashSet[Int]()
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
startedTasks += taskStart.taskInfo.index
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 6b0800a..9274e01 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.scheduler
import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-
import org.scalatest.FunSuite
import org.apache.spark._
@@ -270,9 +268,9 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
- var dagScheduler = new DAGScheduler(taskScheduler) {
+ val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
- override def executorGained(execId: String, host: String) {}
+ override def executorAdded(execId: String, host: String) {}
}
val numFreeCores = 1
@@ -291,7 +289,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(1 === taskDescriptions.length)
taskDescriptions(0).executorId
}
- var count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+ val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
assert(count > 0)
assert(count < numTrials)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 73153d2..9af5d3a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -26,7 +26,9 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.FakeClock
-class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
+class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
+ extends DAGScheduler(sc) {
+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
taskScheduler.startedTasks += taskInfo.index
}
@@ -41,7 +43,7 @@ class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(ta
taskScheduler.endedTasks(taskInfo.index) = reason
}
- override def executorGained(execId: String, host: String) {}
+ override def executorAdded(execId: String, host: String) {}
override def executorLost(execId: String) {}
@@ -66,7 +68,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val executors = new mutable.HashMap[String, String] ++ liveExecutors
- dagScheduler = new FakeDAGScheduler(this)
+ dagScheduler = new FakeDAGScheduler(sc, this)
def removeExecutor(execId: String): Unit = executors -= execId
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 1036b9f..e83cd55 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -28,7 +28,8 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
@@ -57,7 +58,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
conf.set("spark.driver.port", boundPort.toString)
master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
@@ -492,12 +494,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
- assert(store.getSingle("a1").isDefined, "a2 was not in store")
- assert(store.getSingle("a2").isDefined, "a3 was not in store")
- assert(store.getSingle("a3").isDefined, "a1 was not in store")
- assert(store.getSingle("a1").isDefined, "a2 was not in store")
- assert(store.getSingle("a2").isDefined, "a3 was not in store")
- assert(store.getSingle("a3").isDefined, "a1 was not in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a1") == None, "a1 was in store")
@@ -663,6 +662,60 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
+ test("updated block statuses") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
+ val list = List.fill(2)(new Array[Byte](200))
+ val bigList = List.fill(8)(new Array[Byte](200))
+
+ // 1 updated block (i.e. list1)
+ val updatedBlocks1 =
+ store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(updatedBlocks1.size === 1)
+ assert(updatedBlocks1.head._1 === TestBlockId("list1"))
+ assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
+
+ // 1 updated block (i.e. list2)
+ val updatedBlocks2 =
+ store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ assert(updatedBlocks2.size === 1)
+ assert(updatedBlocks2.head._1 === TestBlockId("list2"))
+ assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
+
+ // 2 updated blocks - list1 is kicked out of memory while list3 is added
+ val updatedBlocks3 =
+ store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(updatedBlocks3.size === 2)
+ updatedBlocks3.foreach { case (id, status) =>
+ id match {
+ case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE)
+ case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
+ case _ => fail("Updated block is neither list1 nor list3")
+ }
+ }
+ assert(store.get("list3").isDefined, "list3 was not in store")
+
+ // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
+ val updatedBlocks4 =
+ store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(updatedBlocks4.size === 2)
+ updatedBlocks4.foreach { case (id, status) =>
+ id match {
+ case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY)
+ case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
+ case _ => fail("Updated block is neither list2 nor list4")
+ }
+ }
+ assert(store.get("list4").isDefined, "list4 was not in store")
+
+ // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+ val updatedBlocks5 =
+ store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(updatedBlocks5.size === 0)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list4").isDefined, "list4 was not in store")
+ assert(!store.get("list5").isDefined, "list5 was in store")
+ }
+
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 3041581..45c3224 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -22,6 +22,7 @@ import java.net.ServerSocket
import scala.util.{Failure, Success, Try}
import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
@@ -36,22 +37,27 @@ class UISuite extends FunSuite {
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
- val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
- new SparkConf)
- val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
- new SparkConf)
+ val serverInfo1 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
+ val serverInfo2 = JettyUtils.startJettyServer(
+ "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
// Allow some wiggle room in case ports on the machine are under contention
+ val boundPort1 = serverInfo1.boundPort
+ val boundPort2 = serverInfo2.boundPort
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}
test("jetty binds to port 0 correctly") {
- val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq(), new SparkConf)
- assert(jettyServer.getState === "STARTED")
+ val serverInfo = JettyUtils.startJettyServer(
+ "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
+ val server = serverInfo.server
+ val boundPort = serverInfo.boundPort
+ assert(server.getState === "STARTED")
assert(boundPort != 0)
- Try {new ServerSocket(boundPort)} match {
+ Try { new ServerSocket(boundPort) } match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
- case Failure (e) =>
+ case Failure(e) =>
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8ca863e..d8a3e85 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,11 +22,12 @@ import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
+import org.apache.spark.util.Utils
class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
test("test executor id to summary") {
val sc = new SparkContext("local", "test")
- val listener = new JobProgressListener(sc)
+ val listener = new JobProgressListener(sc.conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
@@ -38,16 +39,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
- listener.onTaskEnd(new SparkListenerTaskEnd(
- new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ var task = new ShuffleMapTask(0, null, null, 0, null)
+ val taskType = Utils.getFormattedClassName(task)
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
- listener.onTaskEnd(new SparkListenerTaskEnd(
- new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ task = new ShuffleMapTask(0, null, null, 0, null)
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.size == 1)
// finish this task, should get updated duration
@@ -55,8 +57,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
- listener.onTaskEnd(new SparkListenerTaskEnd(
- new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ task = new ShuffleMapTask(0, null, null, 0, null)
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 2000)
@@ -65,8 +67,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
- listener.onTaskEnd(new SparkListenerTaskEnd(
- new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ task = new ShuffleMapTask(0, null, null, 0, null)
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000)
}