You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/05/05 14:26:09 UTC
[09/10] spark git commit: [SPARK-3454] separate json endpoints for
data in the UI
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
new file mode 100644
index 0000000..07b224f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{PathParam, GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.ui.SparkUI
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneRDDResource(ui: SparkUI) {
+
+ @GET
+ def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
+ AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
+ throw new NotFoundException(s"no rdd found w/ id $rddId")
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
new file mode 100644
index 0000000..fd24aea
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.SparkException
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.status.api.v1.StageStatus._
+import org.apache.spark.status.api.v1.TaskSorting._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.jobs.UIData.StageUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneStageResource(ui: SparkUI) {
+
+ @GET
+ @Path("")
+ def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
+ withStage(stageId){ stageAttempts =>
+ stageAttempts.map { stage =>
+ AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
+ includeDetails = true)
+ }
+ }
+ }
+
+ @GET
+ @Path("/{stageAttemptId: \\d+}")
+ def oneAttemptData(
+ @PathParam("stageId") stageId: Int,
+ @PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
+ withStageAttempt(stageId, stageAttemptId) { stage =>
+ AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
+ includeDetails = true)
+ }
+ }
+
+ @GET
+ @Path("/{stageAttemptId: \\d+}/taskSummary")
+ def taskSummary(
+ @PathParam("stageId") stageId: Int,
+ @PathParam("stageAttemptId") stageAttemptId: Int,
+ @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
+ : TaskMetricDistributions = {
+ withStageAttempt(stageId, stageAttemptId) { stage =>
+ val quantiles = quantileString.split(",").map { s =>
+ try {
+ s.toDouble
+ } catch {
+ case nfe: NumberFormatException =>
+ throw new BadParameterException("quantiles", "double", s)
+ }
+ }
+ AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles)
+ }
+ }
+
+ @GET
+ @Path("/{stageAttemptId: \\d+}/taskList")
+ def taskList(
+ @PathParam("stageId") stageId: Int,
+ @PathParam("stageAttemptId") stageAttemptId: Int,
+ @DefaultValue("0") @QueryParam("offset") offset: Int,
+ @DefaultValue("20") @QueryParam("length") length: Int,
+ @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
+ withStageAttempt(stageId, stageAttemptId) { stage =>
+ val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
+ .sorted(OneStageResource.ordering(sortBy))
+ tasks.slice(offset, offset + length)
+ }
+ }
+
+ private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData)
+
+ private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = {
+ val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
+ if (stageAttempts.isEmpty) {
+ throw new NotFoundException("unknown stage: " + stageId)
+ } else {
+ f(stageAttempts)
+ }
+ }
+
+ private def findStageStatusUIData(
+ listener: JobProgressListener,
+ stageId: Int): Seq[StageStatusInfoUi] = {
+ listener.synchronized {
+ def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = {
+ infos.filter { _.stageId == stageId }.map { info =>
+ val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId),
+ // this is an internal error -- we should always have uiData
+ throw new SparkException(
+ s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}")
+ )
+ StageStatusInfoUi(status, info, ui)
+ }
+ }
+ getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++
+ getStatusInfoUi(COMPLETE, listener.completedStages) ++
+ getStatusInfoUi(FAILED, listener.failedStages) ++
+ getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq)
+ }
+ }
+
+ private def withStageAttempt[T](
+ stageId: Int,
+ stageAttemptId: Int)
+ (f: StageStatusInfoUi => T): T = {
+ withStage(stageId) { attempts =>
+ val oneAttempt = attempts.find { stage => stage.info.attemptId == stageAttemptId }
+ oneAttempt match {
+ case Some(stage) =>
+ f(stage)
+ case None =>
+ val stageAttempts = attempts.map { _.info.attemptId }
+ throw new NotFoundException(s"unknown attempt for stage $stageId. " +
+ s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
+ }
+ }
+ }
+}
+
+object OneStageResource {
+ def ordering(taskSorting: TaskSorting): Ordering[TaskData] = {
+ val extractor: (TaskData => Long) = td =>
+ taskSorting match {
+ case ID => td.taskId
+ case INCREASING_RUNTIME => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
+ case DECREASING_RUNTIME => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
+ }
+ Ordering.by(extractor)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala
new file mode 100644
index 0000000..95fbd96
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.WebApplicationException
+import javax.ws.rs.core.Response
+
+import com.sun.jersey.spi.container.{ContainerRequest, ContainerRequestFilter}
+
+private[v1] class SecurityFilter extends ContainerRequestFilter with UIRootFromServletContext {
+ def filter(req: ContainerRequest): ContainerRequest = {
+ val user = Option(req.getUserPrincipal).map { _.getName }.orNull
+ if (uiRoot.securityManager.checkUIViewPermissions(user)) {
+ req
+ } else {
+ throw new WebApplicationException(
+ Response
+ .status(Response.Status.FORBIDDEN)
+ .entity(raw"""user "$user"is not authorized""")
+ .build()
+ )
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala
new file mode 100644
index 0000000..cee2978
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.text.SimpleDateFormat
+import java.util.TimeZone
+import javax.ws.rs.WebApplicationException
+import javax.ws.rs.core.Response
+import javax.ws.rs.core.Response.Status
+
+import scala.util.Try
+
+private[v1] class SimpleDateParam(val originalValue: String) {
+ val timestamp: Long = {
+ SimpleDateParam.formats.collectFirst {
+ case fmt if Try(fmt.parse(originalValue)).isSuccess =>
+ fmt.parse(originalValue).getTime()
+ }.getOrElse(
+ throw new WebApplicationException(
+ Response
+ .status(Status.BAD_REQUEST)
+ .entity("Couldn't parse date: " + originalValue)
+ .build()
+ )
+ )
+ }
+}
+
+private[v1] object SimpleDateParam {
+
+ val formats: Seq[SimpleDateFormat] = {
+
+ val gmtDay = new SimpleDateFormat("yyyy-MM-dd")
+ gmtDay.setTimeZone(TimeZone.getTimeZone("GMT"))
+
+ Seq(
+ new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz"),
+ gmtDay
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
new file mode 100644
index 0000000..ef3c857
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.Date
+
+import scala.collection.Map
+
+import org.apache.spark.JobExecutionStatus
+
+class ApplicationInfo private[spark](
+ val id: String,
+ val name: String,
+ val attempts: Seq[ApplicationAttemptInfo])
+
+class ApplicationAttemptInfo private[spark](
+ val attemptId: Option[String],
+ val startTime: Date,
+ val endTime: Date,
+ val sparkUser: String,
+ val completed: Boolean = false)
+
+class ExecutorStageSummary private[spark](
+ val taskTime : Long,
+ val failedTasks : Int,
+ val succeededTasks : Int,
+ val inputBytes : Long,
+ val outputBytes : Long,
+ val shuffleRead : Long,
+ val shuffleWrite : Long,
+ val memoryBytesSpilled : Long,
+ val diskBytesSpilled : Long)
+
+class ExecutorSummary private[spark](
+ val id: String,
+ val hostPort: String,
+ val rddBlocks: Int,
+ val memoryUsed: Long,
+ val diskUsed: Long,
+ val activeTasks: Int,
+ val failedTasks: Int,
+ val completedTasks: Int,
+ val totalTasks: Int,
+ val totalDuration: Long,
+ val totalInputBytes: Long,
+ val totalShuffleRead: Long,
+ val totalShuffleWrite: Long,
+ val maxMemory: Long,
+ val executorLogs: Map[String, String])
+
+class JobData private[spark](
+ val jobId: Int,
+ val name: String,
+ val description: Option[String],
+ val submissionTime: Option[Date],
+ val completionTime: Option[Date],
+ val stageIds: Seq[Int],
+ val jobGroup: Option[String],
+ val status: JobExecutionStatus,
+ val numTasks: Int,
+ val numActiveTasks: Int,
+ val numCompletedTasks: Int,
+ val numSkippedTasks: Int,
+ val numFailedTasks: Int,
+ val numActiveStages: Int,
+ val numCompletedStages: Int,
+ val numSkippedStages: Int,
+ val numFailedStages: Int)
+
+// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage
+// page ... does anybody pay attention to it?
+class RDDStorageInfo private[spark](
+ val id: Int,
+ val name: String,
+ val numPartitions: Int,
+ val numCachedPartitions: Int,
+ val storageLevel: String,
+ val memoryUsed: Long,
+ val diskUsed: Long,
+ val dataDistribution: Option[Seq[RDDDataDistribution]],
+ val partitions: Option[Seq[RDDPartitionInfo]])
+
+class RDDDataDistribution private[spark](
+ val address: String,
+ val memoryUsed: Long,
+ val memoryRemaining: Long,
+ val diskUsed: Long)
+
+class RDDPartitionInfo private[spark](
+ val blockName: String,
+ val storageLevel: String,
+ val memoryUsed: Long,
+ val diskUsed: Long,
+ val executors: Seq[String])
+
+class StageData private[spark](
+ val status: StageStatus,
+ val stageId: Int,
+ val attemptId: Int,
+ val numActiveTasks: Int ,
+ val numCompleteTasks: Int,
+ val numFailedTasks: Int,
+
+ val executorRunTime: Long,
+
+ val inputBytes: Long,
+ val inputRecords: Long,
+ val outputBytes: Long,
+ val outputRecords: Long,
+ val shuffleReadBytes: Long,
+ val shuffleReadRecords: Long,
+ val shuffleWriteBytes: Long,
+ val shuffleWriteRecords: Long,
+ val memoryBytesSpilled: Long,
+ val diskBytesSpilled: Long,
+
+ val name: String,
+ val details: String,
+ val schedulingPool: String,
+
+ val accumulatorUpdates: Seq[AccumulableInfo],
+ val tasks: Option[Map[Long, TaskData]],
+ val executorSummary:Option[Map[String,ExecutorStageSummary]])
+
+class TaskData private[spark](
+ val taskId: Long,
+ val index: Int,
+ val attempt: Int,
+ val launchTime: Date,
+ val executorId: String,
+ val host: String,
+ val taskLocality: String,
+ val speculative: Boolean,
+ val accumulatorUpdates: Seq[AccumulableInfo],
+ val errorMessage: Option[String] = None,
+ val taskMetrics: Option[TaskMetrics] = None)
+
+class TaskMetrics private[spark](
+ val executorDeserializeTime: Long,
+ val executorRunTime: Long,
+ val resultSize: Long,
+ val jvmGcTime: Long,
+ val resultSerializationTime: Long,
+ val memoryBytesSpilled: Long,
+ val diskBytesSpilled: Long,
+ val inputMetrics: Option[InputMetrics],
+ val outputMetrics: Option[OutputMetrics],
+ val shuffleReadMetrics: Option[ShuffleReadMetrics],
+ val shuffleWriteMetrics: Option[ShuffleWriteMetrics])
+
+class InputMetrics private[spark](
+ val bytesRead: Long,
+ val recordsRead: Long)
+
+class OutputMetrics private[spark](
+ val bytesWritten: Long,
+ val recordsWritten: Long)
+
+class ShuffleReadMetrics private[spark](
+ val remoteBlocksFetched: Int,
+ val localBlocksFetched: Int,
+ val fetchWaitTime: Long,
+ val remoteBytesRead: Long,
+ val totalBlocksFetched: Int,
+ val recordsRead: Long)
+
+class ShuffleWriteMetrics private[spark](
+ val bytesWritten: Long,
+ val writeTime: Long,
+ val recordsWritten: Long)
+
+class TaskMetricDistributions private[spark](
+ val quantiles: IndexedSeq[Double],
+
+ val executorDeserializeTime: IndexedSeq[Double],
+ val executorRunTime: IndexedSeq[Double],
+ val resultSize: IndexedSeq[Double],
+ val jvmGcTime: IndexedSeq[Double],
+ val resultSerializationTime: IndexedSeq[Double],
+ val memoryBytesSpilled: IndexedSeq[Double],
+ val diskBytesSpilled: IndexedSeq[Double],
+
+ val inputMetrics: Option[InputMetricDistributions],
+ val outputMetrics: Option[OutputMetricDistributions],
+ val shuffleReadMetrics: Option[ShuffleReadMetricDistributions],
+ val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions])
+
+class InputMetricDistributions private[spark](
+ val bytesRead: IndexedSeq[Double],
+ val recordsRead: IndexedSeq[Double])
+
+class OutputMetricDistributions private[spark](
+ val bytesWritten: IndexedSeq[Double],
+ val recordsWritten: IndexedSeq[Double])
+
+class ShuffleReadMetricDistributions private[spark](
+ val readBytes: IndexedSeq[Double],
+ val readRecords: IndexedSeq[Double],
+ val remoteBlocksFetched: IndexedSeq[Double],
+ val localBlocksFetched: IndexedSeq[Double],
+ val fetchWaitTime: IndexedSeq[Double],
+ val remoteBytesRead: IndexedSeq[Double],
+ val totalBlocksFetched: IndexedSeq[Double])
+
+class ShuffleWriteMetricDistributions private[spark](
+ val writeBytes: IndexedSeq[Double],
+ val writeRecords: IndexedSeq[Double],
+ val writeTime: IndexedSeq[Double])
+
+class AccumulableInfo private[spark](
+ val id: Long,
+ val name: String,
+ val update: Option[String],
+ val value: String)
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 7d75929..ec71148 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -25,13 +25,17 @@ import org.apache.spark.scheduler._
/**
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
+ *
+ * This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
- def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq
+ def storageStatusList: Seq[StorageStatus] = synchronized {
+ executorIdToStorageStatus.values.toSeq
+ }
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index a5271f0..bfe4a18 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,6 +17,9 @@
package org.apache.spark.ui
+import java.util.Date
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
@@ -33,7 +36,7 @@ import org.apache.spark.ui.scope.RDDOperationGraphListener
private[spark] class SparkUI private (
val sc: Option[SparkContext],
val conf: SparkConf,
- val securityManager: SecurityManager,
+ securityManager: SecurityManager,
val environmentListener: EnvironmentListener,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
@@ -41,22 +44,27 @@ private[spark] class SparkUI private (
val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener,
var appName: String,
- val basePath: String)
+ val basePath: String,
+ val startTime: Long)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
- with Logging {
+ with Logging
+ with UIRoot {
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
+
+ val stagesTab = new StagesTab(this)
+
/** Initialize all components of the server. */
def initialize() {
attachTab(new JobsTab(this))
- val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
+ attachHandler(JsonRootResource.getJsonServlet(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages", stagesTab.handleKillRequest,
@@ -83,6 +91,24 @@ private[spark] class SparkUI private (
private[spark] def appUIHostPort = publicHostName + ":" + boundPort
private[spark] def appUIAddress = s"http://$appUIHostPort"
+
+ def getSparkUI(appId: String): Option[SparkUI] = {
+ if (appId == appName) Some(this) else None
+ }
+
+ def getApplicationInfoList: Iterator[ApplicationInfo] = {
+ Iterator(new ApplicationInfo(
+ id = appName,
+ name = appName,
+ attempts = Seq(new ApplicationAttemptInfo(
+ attemptId = None,
+ startTime = new Date(startTime),
+ endTime = new Date(-1),
+ sparkUser = "",
+ completed = false
+ ))
+ ))
+ }
}
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
@@ -109,9 +135,10 @@ private[spark] object SparkUI {
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
- appName: String): SparkUI = {
+ appName: String,
+ startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
- jobProgressListener = Some(jobProgressListener))
+ jobProgressListener = Some(jobProgressListener), startTime = startTime)
}
def createHistoryUI(
@@ -119,8 +146,9 @@ private[spark] object SparkUI {
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
- basePath: String): SparkUI = {
- create(None, conf, listenerBus, securityManager, appName, basePath)
+ basePath: String,
+ startTime: Long): SparkUI = {
+ create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
}
/**
@@ -137,7 +165,8 @@ private[spark] object SparkUI {
securityManager: SecurityManager,
appName: String,
basePath: String = "",
- jobProgressListener: Option[JobProgressListener] = None): SparkUI = {
+ jobProgressListener: Option[JobProgressListener] = None,
+ startTime: Long): SparkUI = {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
@@ -159,6 +188,6 @@ private[spark] object SparkUI {
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
- appName, basePath)
+ appName, basePath, startTime)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index f9860d1..384f2ad 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
* pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
*/
private[spark] abstract class WebUI(
- securityManager: SecurityManager,
+ val securityManager: SecurityManager,
port: Int,
conf: SparkConf,
basePath: String = "",
@@ -77,15 +77,9 @@ private[spark] abstract class WebUI(
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, basePath)
- val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
- (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
attachHandler(renderHandler)
- attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
- pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
- .append(renderJsonHandler)
-
}
/** Attach a handler to this UI. */
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 956608d..b247e4c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -22,11 +22,11 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
+import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
-/** Summary information about an executor to display in the UI. */
-// Needs to be private[ui] because of a false positive MiMa failure.
+// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
@@ -44,6 +44,7 @@ private[ui] case class ExecutorSummaryInfo(
maxMemory: Long,
executorLogs: Map[String, String])
+
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
threadDumpEnabled: Boolean)
@@ -55,7 +56,8 @@ private[ui] class ExecutorsPage(
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
- val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
+ val execInfo = for (statusId <- 0 until storageStatusList.size) yield
+ ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
@@ -111,7 +113,7 @@ private[ui] class ExecutorsPage(
}
/** Render an HTML row representing an executor */
- private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
+ private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
@@ -170,8 +172,11 @@ private[ui] class ExecutorsPage(
</tr>
}
+}
+
+private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
- private def getExecInfo(statusId: Int): ExecutorSummaryInfo = {
+ def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
@@ -189,7 +194,7 @@ private[ui] class ExecutorsPage(
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
- new ExecutorSummaryInfo(
+ new ExecutorSummary(
execId,
hostPort,
rddBlocks,
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index f6abf27..09323d1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -271,6 +271,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val shouldShowCompletedJobs = completedJobs.nonEmpty
val shouldShowFailedJobs = failedJobs.nonEmpty
+ val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) {
+ s"${completedJobs.size}"
+ } else {
+ s"${listener.numCompletedJobs}, only showing ${completedJobs.size}"
+ }
+
val summary: NodeSeq =
<div>
<ul class="unstyled">
@@ -295,9 +301,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
{
if (shouldShowCompletedJobs) {
- <li>
+ <li id="completed-summary">
<a href="#completed"><strong>Completed Jobs:</strong></a>
- {completedJobs.size}
+ {completedJobNumStr}
</li>
}
}
@@ -305,7 +311,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
if (shouldShowFailedJobs) {
<li>
<a href="#failed"><strong>Failed Jobs:</strong></a>
- {failedJobs.size}
+ {listener.numFailedJobs}
</li>
}
}
@@ -322,7 +328,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
activeJobsTable
}
if (shouldShowCompletedJobs) {
- content ++= <h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++
+ content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++
completedJobsTable
}
if (shouldShowFailedJobs) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index 236bc8e..a37f739 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -64,6 +64,12 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val shouldShowCompletedStages = completedStages.nonEmpty
val shouldShowFailedStages = failedStages.nonEmpty
+ val completedStageNumStr = if (numCompletedStages == completedStages.size) {
+ s"$numCompletedStages"
+ } else {
+ s"$numCompletedStages, only showing ${completedStages.size}"
+ }
+
val summary: NodeSeq =
<div>
<ul class="unstyled">
@@ -98,9 +104,9 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
}
{
if (shouldShowCompletedStages) {
- <li>
+ <li id="completed-summary">
<a href="#completed"><strong>Completed Stages:</strong></a>
- {numCompletedStages}
+ {completedStageNumStr}
</li>
}
}
@@ -132,7 +138,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
pendingStagesTable.toNodeSeq
}
if (shouldShowCompletedStages) {
- content ++= <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
+ content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++
completedStagesTable.toNodeSeq
}
if (shouldShowFailedStages) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 96cc3d7..7163217 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -187,7 +187,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
val jobDataOption = listener.jobIdToData.get(jobId)
if (jobDataOption.isEmpty) {
val content =
- <div>
+ <div id="no-info">
<p>No information to display for job {jobId}</p>
</div>
return UIUtils.headerSparkPage(
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8f9aa9f..246e191 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -74,6 +74,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// JobProgressListener's retention limits.
var numCompletedStages = 0
var numFailedStages = 0
+ var numCompletedJobs = 0
+ var numFailedJobs = 0
// Misc:
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
@@ -217,10 +219,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
completedJobs += jobData
trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
+ numCompletedJobs += 1
case JobFailed(exception) =>
failedJobs += jobData
trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
+ numFailedJobs += 1
}
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index d725b9d..f3e0b38 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.scheduler.{Schedulable, StageInfo}
+import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 5793100..89d175b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -25,11 +25,11 @@ import scala.xml.{Elem, Node, Unparsed}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.ui.scope.RDDOperationGraph
import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
@@ -48,14 +48,22 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val stageAttemptId = parameterAttempt.toInt
val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId))
- if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
+ val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
+ if (stageDataOption.isEmpty) {
+ val content =
+ <div id="no-info">
+ <p>No information to display for Stage {stageId} (Attempt {stageAttemptId})</p>
+ </div>
+ return UIUtils.headerSparkPage(stageHeader, content, parent)
+
+ }
+ if (stageDataOption.get.taskData.isEmpty) {
val content =
<div>
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
- return UIUtils.headerSparkPage(
- s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent)
+ return UIUtils.headerSparkPage(stageHeader, content, parent)
}
val stageData = stageDataOption.get
@@ -446,8 +454,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable
- UIUtils.headerSparkPage(
- "Details for Stage %d".format(stageId), content, parent, showVisualization = true)
+ UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 199f731..05f94a7 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -21,8 +21,8 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
@@ -32,28 +32,19 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
-
val rddId = parameterId.toInt
- val storageStatusList = listener.storageStatusList
- val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
- // Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
- }
+ val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true)
+ .getOrElse {
+ // Rather than crashing, render an "RDD Not Found" page
+ return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
+ }
// Worker table
- val workers = storageStatusList.map((rddId, _))
- val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers,
- id = Some("rdd-storage-by-worker-table"))
+ val workerTable = UIUtils.listingTable(workerHeader, workerRow,
+ rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table"))
// Block table
- val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
- val blocks = storageStatusList
- .flatMap(_.rddBlocksById(rddId))
- .sortWith(_._1.name < _._1.name)
- .map { case (blockId, status) =>
- (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
- }
- val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks,
+ val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get,
id = Some("rdd-storage-by-block-table"))
val content =
@@ -62,23 +53,23 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<ul class="unstyled">
<li>
<strong>Storage Level:</strong>
- {rddInfo.storageLevel.description}
+ {rddStorageInfo.storageLevel}
</li>
<li>
<strong>Cached Partitions:</strong>
- {rddInfo.numCachedPartitions}
+ {rddStorageInfo.numCachedPartitions}
</li>
<li>
<strong>Total Partitions:</strong>
- {rddInfo.numPartitions}
+ {rddStorageInfo.numPartitions}
</li>
<li>
<strong>Memory Size:</strong>
- {Utils.bytesToString(rddInfo.memSize)}
+ {Utils.bytesToString(rddStorageInfo.memoryUsed)}
</li>
<li>
<strong>Disk Size:</strong>
- {Utils.bytesToString(rddInfo.diskSize)}
+ {Utils.bytesToString(rddStorageInfo.diskUsed)}
</li>
</ul>
</div>
@@ -86,19 +77,19 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<div class="row-fluid">
<div class="span12">
- <h4> Data Distribution on {workers.size} Executors </h4>
+ <h4> Data Distribution on {rddStorageInfo.dataDistribution.size} Executors </h4>
{workerTable}
</div>
</div>
<div class="row-fluid">
<div class="span12">
- <h4> {blocks.size} Partitions </h4>
+ <h4> {rddStorageInfo.partitions.size} Partitions </h4>
{blockTable}
</div>
</div>;
- UIUtils.headerSparkPage("RDD Storage Info for " + rddInfo.name, content, parent)
+ UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent)
}
/** Header fields for the worker table */
@@ -116,34 +107,32 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
"Executors")
/** Render an HTML row representing a worker */
- private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = {
- val (rddId, status) = worker
+ private def workerRow(worker: RDDDataDistribution): Seq[Node] = {
<tr>
- <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+ <td>{worker.address}</td>
<td>
- {Utils.bytesToString(status.memUsedByRdd(rddId))}
- ({Utils.bytesToString(status.memRemaining)} Remaining)
+ {Utils.bytesToString(worker.memoryUsed)}
+ ({Utils.bytesToString(worker.memoryRemaining)} Remaining)
</td>
- <td>{Utils.bytesToString(status.diskUsedByRdd(rddId))}</td>
+ <td>{Utils.bytesToString(worker.diskUsed)}</td>
</tr>
}
/** Render an HTML row representing a block */
- private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = {
- val (id, block, locations) = row
+ private def blockRow(row: RDDPartitionInfo): Seq[Node] = {
<tr>
- <td>{id}</td>
+ <td>{row.blockName}</td>
<td>
- {block.storageLevel.description}
+ {row.storageLevel}
</td>
- <td sorttable_customkey={block.memSize.toString}>
- {Utils.bytesToString(block.memSize)}
+ <td sorttable_customkey={row.memoryUsed.toString}>
+ {Utils.bytesToString(row.memoryUsed)}
</td>
- <td sorttable_customkey={block.diskSize.toString}>
- {Utils.bytesToString(block.diskSize)}
+ <td sorttable_customkey={row.diskUsed.toString}>
+ {Utils.bytesToString(row.diskUsed)}
</td>
<td>
- {locations.map(l => <span>{l}<br/></span>)}
+ {row.executors.map(l => <span>{l}<br/></span>)}
</td>
</tr>
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 59dc6b5..07db783 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 045bd78..0351749 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -35,6 +35,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag
/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
+ *
+ * This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
@@ -43,7 +45,9 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
/** Filter RDD info to include only those with cached partitions */
- def rddInfoList: Seq[RDDInfo] = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
+ def rddInfoList: Seq[RDDInfo] = synchronized {
+ _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
+ }
/** Update the storage info of the RDDs whose blocks are among the given updated blocks */
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/json_expectation
new file mode 100644
index 0000000..6101177
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/json_expectation
@@ -0,0 +1,53 @@
+[ {
+ "id" : "local-1427397477963",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-03-26T19:17:57.184GMT",
+ "endTime" : "2015-03-26T19:20:02.949GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
+ "id" : "local-1426533911241",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "attemptId" : "2",
+ "startTime" : "2015-03-17T23:11:50.242GMT",
+ "endTime" : "2015-03-17T23:12:25.177GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ }, {
+ "attemptId" : "1",
+ "startTime" : "2015-03-16T19:25:10.242GMT",
+ "endTime" : "2015-03-16T19:25:45.177GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
+ "id" : "local-1425081759269",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-02-28T00:02:38.277GMT",
+ "endTime" : "2015-02-28T00:02:46.912GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
+ "id" : "local-1422981780767",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-02-03T16:42:59.720GMT",
+ "endTime" : "2015-02-03T16:43:08.731GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
+ "id" : "local-1422981759269",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-02-03T16:42:38.277GMT",
+ "endTime" : "2015-02-03T16:42:46.912GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation
new file mode 100644
index 0000000..cb622e1
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation
@@ -0,0 +1,17 @@
+[ {
+ "id" : "<driver>",
+ "hostPort" : "localhost:57971",
+ "rddBlocks" : 8,
+ "memoryUsed" : 28000128,
+ "diskUsed" : 0,
+ "activeTasks" : 0,
+ "failedTasks" : 1,
+ "completedTasks" : 31,
+ "totalTasks" : 32,
+ "totalDuration" : 8820,
+ "totalInputBytes" : 28000288,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 13180,
+ "maxMemory" : 278302556,
+ "executorLogs" : { }
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation
new file mode 100644
index 0000000..4a29072
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation
@@ -0,0 +1,15 @@
+{
+ "jobId" : 0,
+ "name" : "count at <console>:15",
+ "stageIds" : [ 0 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation
new file mode 100644
index 0000000..cab4750
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation
@@ -0,0 +1,43 @@
+[ {
+ "jobId" : 2,
+ "name" : "count at <console>:17",
+ "stageIds" : [ 3 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+}, {
+ "jobId" : 1,
+ "name" : "count at <console>:20",
+ "stageIds" : [ 1, 2 ],
+ "status" : "FAILED",
+ "numTasks" : 16,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 15,
+ "numSkippedTasks" : 15,
+ "numFailedTasks" : 1,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 1
+}, {
+ "jobId" : 0,
+ "name" : "count at <console>:15",
+ "stageIds" : [ 0 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation
new file mode 100644
index 0000000..cab4750
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation
@@ -0,0 +1,43 @@
+[ {
+ "jobId" : 2,
+ "name" : "count at <console>:17",
+ "stageIds" : [ 3 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+}, {
+ "jobId" : 1,
+ "name" : "count at <console>:20",
+ "stageIds" : [ 1, 2 ],
+ "status" : "FAILED",
+ "numTasks" : 16,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 15,
+ "numSkippedTasks" : 15,
+ "numFailedTasks" : 1,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 1
+}, {
+ "jobId" : 0,
+ "name" : "count at <console>:15",
+ "stageIds" : [ 0 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation
new file mode 100644
index 0000000..6fd25be
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation
@@ -0,0 +1,29 @@
+[ {
+ "jobId" : 2,
+ "name" : "count at <console>:17",
+ "stageIds" : [ 3 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+}, {
+ "jobId" : 0,
+ "name" : "count at <console>:15",
+ "stageIds" : [ 0 ],
+ "status" : "SUCCEEDED",
+ "numTasks" : 8,
+ "numActiveTasks" : 0,
+ "numCompletedTasks" : 8,
+ "numSkippedTasks" : 8,
+ "numFailedTasks" : 0,
+ "numActiveStages" : 0,
+ "numCompletedStages" : 1,
+ "numSkippedStages" : 0,
+ "numFailedStages" : 0
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation
new file mode 100644
index 0000000..07489ad
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation
@@ -0,0 +1,10 @@
+{
+ "id" : "local-1422981780767",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-02-03T16:42:59.720GMT",
+ "endTime" : "2015-02-03T16:43:08.731GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation
new file mode 100644
index 0000000..111cb81
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation
@@ -0,0 +1,270 @@
+{
+ "status" : "COMPLETE",
+ "stageId" : 1,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 8,
+ "numFailedTasks" : 0,
+ "executorRunTime" : 3476,
+ "inputBytes" : 28000128,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 13180,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "map at <console>:14",
+ "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM
ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ],
+ "tasks" : {
+ "8" : {
+ "taskId" : 8,
+ "index" : 0,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.829GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 435,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 2,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 94000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "11" : {
+ "taskId" : 11,
+ "index" : 3,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1647,
+ "writeTime" : 83000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "14" : {
+ "taskId" : 14,
+ "index" : 6,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.832GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 88000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "13" : {
+ "taskId" : 13,
+ "index" : 5,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.831GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 2,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 73000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "10" : {
+ "taskId" : 10,
+ "index" : 2,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 76000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "9" : {
+ "taskId" : 9,
+ "index" : 1,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 436,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 98000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "12" : {
+ "taskId" : 12,
+ "index" : 4,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.831GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1645,
+ "writeTime" : 101000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "15" : {
+ "taskId" : 15,
+ "index" : 7,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.833GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 435,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 79000,
+ "recordsWritten" : 0
+ }
+ }
+ }
+ },
+ "executorSummary" : {
+ "<driver>" : {
+ "taskTime" : 3624,
+ "failedTasks" : 0,
+ "succeededTasks" : 8,
+ "inputBytes" : 28000128,
+ "outputBytes" : 0,
+ "shuffleRead" : 0,
+ "shuffleWrite" : 13180,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation
new file mode 100644
index 0000000..ef339f8
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation
@@ -0,0 +1,270 @@
+[ {
+ "status" : "COMPLETE",
+ "stageId" : 1,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 8,
+ "numFailedTasks" : 0,
+ "executorRunTime" : 3476,
+ "inputBytes" : 28000128,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 13180,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "map at <console>:14",
+ "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM
ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ],
+ "tasks" : {
+ "8" : {
+ "taskId" : 8,
+ "index" : 0,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.829GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 435,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 2,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 94000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "11" : {
+ "taskId" : 11,
+ "index" : 3,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1647,
+ "writeTime" : 83000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "14" : {
+ "taskId" : 14,
+ "index" : 6,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.832GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 88000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "13" : {
+ "taskId" : 13,
+ "index" : 5,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.831GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 2,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 73000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "10" : {
+ "taskId" : 10,
+ "index" : 2,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 76000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "9" : {
+ "taskId" : 9,
+ "index" : 1,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.830GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 436,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 98000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "12" : {
+ "taskId" : 12,
+ "index" : 4,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.831GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 2,
+ "executorRunTime" : 434,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1645,
+ "writeTime" : 101000,
+ "recordsWritten" : 0
+ }
+ }
+ },
+ "15" : {
+ "taskId" : 15,
+ "index" : 7,
+ "attempt" : 0,
+ "launchTime" : "2015-02-03T16:43:05.833GMT",
+ "executorId" : "<driver>",
+ "host" : "localhost",
+ "taskLocality" : "PROCESS_LOCAL",
+ "speculative" : false,
+ "accumulatorUpdates" : [ ],
+ "taskMetrics" : {
+ "executorDeserializeTime" : 1,
+ "executorRunTime" : 435,
+ "resultSize" : 1902,
+ "jvmGcTime" : 19,
+ "resultSerializationTime" : 1,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "inputMetrics" : {
+ "bytesRead" : 3500016,
+ "recordsRead" : 0
+ },
+ "shuffleWriteMetrics" : {
+ "bytesWritten" : 1648,
+ "writeTime" : 79000,
+ "recordsWritten" : 0
+ }
+ }
+ }
+ },
+ "executorSummary" : {
+ "<driver>" : {
+ "taskTime" : 3624,
+ "failedTasks" : 0,
+ "succeededTasks" : 8,
+ "inputBytes" : 28000128,
+ "outputBytes" : 0,
+ "shuffleRead" : 0,
+ "shuffleWrite" : 13180,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0
+ }
+ }
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation
new file mode 100644
index 0000000..056fac7
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation
@@ -0,0 +1,89 @@
+[ {
+ "status" : "COMPLETE",
+ "stageId" : 3,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 8,
+ "numFailedTasks" : 0,
+ "executorRunTime" : 162,
+ "inputBytes" : 160,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 0,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "count at <console>:17",
+ "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line19.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:17)\n$line19.$read$$iwC$$iwC$$iwC.<init>(<console>:22)\n$line19.$read$$iwC$$iwC.<init>(<console>:24)\n$line19.$read$$iwC.<init>(<console>:26)\n$line19.$read.<init>(<console>:28)\n$line19.$read$.<init>(<console>:32)\n$line19.$read$.<clinit>(<console>)\n$line19.$eval$.<init>(<console>:7)\n$line19.$eval$.<clinit>(<console>)\n$line19.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.Spark
IMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ]
+}, {
+ "status" : "COMPLETE",
+ "stageId" : 1,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 8,
+ "numFailedTasks" : 0,
+ "executorRunTime" : 3476,
+ "inputBytes" : 28000128,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 13180,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "map at <console>:14",
+ "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM
ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ]
+}, {
+ "status" : "COMPLETE",
+ "stageId" : 0,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 8,
+ "numFailedTasks" : 0,
+ "executorRunTime" : 4338,
+ "inputBytes" : 0,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 0,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "count at <console>:15",
+ "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.inte
rpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ]
+}, {
+ "status" : "FAILED",
+ "stageId" : 2,
+ "attemptId" : 0,
+ "numActiveTasks" : 0,
+ "numCompleteTasks" : 7,
+ "numFailedTasks" : 1,
+ "executorRunTime" : 278,
+ "inputBytes" : 0,
+ "inputRecords" : 0,
+ "outputBytes" : 0,
+ "outputRecords" : 0,
+ "shuffleReadBytes" : 0,
+ "shuffleReadRecords" : 0,
+ "shuffleWriteBytes" : 0,
+ "shuffleWriteRecords" : 0,
+ "memoryBytesSpilled" : 0,
+ "diskBytesSpilled" : 0,
+ "name" : "count at <console>:20",
+ "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.Spark
IMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
+ "schedulingPool" : "default",
+ "accumulatorUpdates" : [ ]
+} ]
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org