You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/30 03:22:19 UTC

spark git commit: [SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch

Repository: spark
Updated Branches:
  refs/heads/master 114bad606 -> 1b7106b86


[SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch

This is an initial commit for SPARK-6862. Once SPARK-6796 is merged, I will add the links to StreamingPage so that the user can jump to BatchPage.

Screenshots:
![success](https://cloud.githubusercontent.com/assets/1000778/7102439/bbe75406-e0b3-11e4-84fe-3e6de629a49a.png)
![failure](https://cloud.githubusercontent.com/assets/1000778/7102440/bc124454-e0b3-11e4-921a-c8b39d6b61bc.png)

Author: zsxwing <zs...@gmail.com>

Closes #5473 from zsxwing/SPARK-6862 and squashes the following commits:

0727d35 [zsxwing] Change BatchUIData to a case class
b380cfb [zsxwing] Add createJobStart to eliminate duplicate codes
9a3083d [zsxwing] Rename XxxDatas -> XxxData
087ba98 [zsxwing] Refactor BatchInfo to store only necessary fields
cb62e4f [zsxwing] Use Seq[(OutputOpId, SparkJobId)] to store the id relations
72f8e7e [zsxwing] Add unit tests for BatchPage
1282b10 [zsxwing] Handle some corner cases and add tests for StreamingJobProgressListener
77a69ae [zsxwing] Refactor codes as per TD's comments
35ffd80 [zsxwing] Merge branch 'master' into SPARK-6862
15bdf9b [zsxwing] Add batch links and unit tests
4bf66b6 [zsxwing] Merge branch 'master' into SPARK-6862
7168807 [zsxwing] Limit the max width of the error message and fix nits in the UI
0b226f9 [zsxwing] Change 'Last Error' to 'Error'
fc98a43 [zsxwing] Put clearing local properties to finally and remove redundant private[streaming]
0c7b2eb [zsxwing] Add BatchPage to display details of a batch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b7106b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b7106b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b7106b8

Branch: refs/heads/master
Commit: 1b7106b867bc0aa4d64b669d79b646f862acaf47
Parents: 114bad6
Author: zsxwing <zs...@gmail.com>
Authored: Wed Apr 29 18:22:14 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 29 18:22:14 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   2 +-
 .../spark/streaming/dstream/DStream.scala       |   2 +-
 .../apache/spark/streaming/scheduler/Job.scala  |  44 +++-
 .../streaming/scheduler/JobScheduler.scala      |  28 +-
 .../spark/streaming/scheduler/JobSet.scala      |   2 +-
 .../spark/streaming/ui/AllBatchesTable.scala    |  26 +-
 .../apache/spark/streaming/ui/BatchPage.scala   | 264 +++++++++++++++++++
 .../apache/spark/streaming/ui/BatchUIData.scala |  75 ++++++
 .../ui/StreamingJobProgressListener.scala       | 161 +++++++----
 .../spark/streaming/ui/StreamingTab.scala       |   4 +-
 .../spark/streaming/UISeleniumSuite.scala       |  83 +++++-
 .../ui/StreamingJobProgressListenerSuite.scala  | 100 ++++++-
 12 files changed, 710 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 711a369..935c8a4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet
 
 import scala.collection.mutable.HashMap
 
-private[jobs] object UIData {
+private[spark] object UIData {
 
   class ExecutorSummary {
     var taskTime : Long = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 24f99a2..83d41f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
         println("Time: " + time)
         println("-------------------------------------------")
         firstNum.take(num).foreach(println)
-        if (firstNum.size > num) println("...")
+        if (firstNum.length > num) println("...")
         println()
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 30cf87f..3c481bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -25,15 +25,49 @@ import scala.util.Try
  */
 private[streaming]
 class Job(val time: Time, func: () => _) {
-  var id: String = _
-  var result: Try[_] = null
+  private var _id: String = _
+  private var _outputOpId: Int = _
+  private var isSet = false
+  private var _result: Try[_] = null
 
   def run() {
-    result = Try(func())
+    _result = Try(func())
   }
 
-  def setId(number: Int) {
-    id = "streaming job " + time + "." + number
+  def result: Try[_] = {
+    if (_result == null) {
+      throw new IllegalStateException("Cannot access result before job finishes")
+    }
+    _result
+  }
+
+  /**
+   * @return the global unique id of this Job.
+   */
+  def id: String = {
+    if (!isSet) {
+      throw new IllegalStateException("Cannot access id before calling setId")
+    }
+    _id
+  }
+
+  /**
+   * @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
+   */
+  def outputOpId: Int = {
+    if (!isSet) {
+      throw new IllegalStateException("Cannot access number before calling setId")
+    }
+    _outputOpId
+  }
+
+  def setOutputOpId(outputOpId: Int) {
+    if (isSet) {
+      throw new IllegalStateException("Cannot call setOutputOpId more than once")
+    }
+    isSet = true
+    _id = s"streaming job $time.$outputOpId"
+    _outputOpId = outputOpId
   }
 
   override def toString: String = id

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 508b892..c7a2c11 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -172,16 +172,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     ssc.waiter.notifyError(e)
   }
 
-  private class JobHandler(job: Job) extends Runnable {
+  private class JobHandler(job: Job) extends Runnable with Logging {
     def run() {
-      eventLoop.post(JobStarted(job))
-      // Disable checks for existing output directories in jobs launched by the streaming scheduler,
-      // since we may need to write output to an existing directory during checkpoint recovery;
-      // see SPARK-4835 for more details.
-      PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
-        job.run()
+      ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+      ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+      try {
+        eventLoop.post(JobStarted(job))
+        // Disable checks for existing output directories in jobs launched by the streaming
+        // scheduler, since we may need to write output to an existing directory during checkpoint
+        // recovery; see SPARK-4835 for more details.
+        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+          job.run()
+        }
+        eventLoop.post(JobCompleted(job))
+      } finally {
+        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
+        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
       }
-      eventLoop.post(JobCompleted(job))
     }
   }
 }
+
+private[streaming] object JobScheduler {
+  val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
+  val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 5b13487..24b3794 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -35,7 +35,7 @@ case class JobSet(
   private var processingStartTime = -1L // when the first job of this jobset started processing
   private var processingEndTime = -1L // when the last job of this jobset finished processing
 
-  jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+  jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
   incompleteJobs ++= jobs
 
   def handleJobStart(job: Job) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index df1c0a1..e219e27 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui
 
 import scala.xml.Node
 
-import org.apache.spark.streaming.scheduler.BatchInfo
 import org.apache.spark.ui.UIUtils
 
 private[ui] abstract class BatchTableBase(tableId: String) {
@@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) {
       <th>Processing Time</th>
   }
 
-  protected def baseRow(batch: BatchInfo): Seq[Node] = {
+  protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
     val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
-    val eventCount = batch.receivedBlockInfo.values.map {
-      receivers => receivers.map(_.numRecords).sum
-    }.sum
+    val eventCount = batch.numRecords
     val schedulingDelay = batch.schedulingDelay
     val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
     val processingTime = batch.processingDelay
     val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
 
-    <td sorttable_customkey={batchTime.toString}>{formattedBatchTime}</td>
+    <td sorttable_customkey={batchTime.toString}>
+      <a href={s"batch?id=$batchTime"}>
+        {formattedBatchTime}
+      </a>
+    </td>
       <td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
       <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
         {formattedSchedulingDelay}
@@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
   protected def renderRows: Seq[Node]
 }
 
-private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
-  extends BatchTableBase("active-batches-table") {
+private[ui] class ActiveBatchTable(
+    runningBatches: Seq[BatchUIData],
+    waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
 
   override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
 
@@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
       runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
   }
 
-  private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
+  private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
     baseRow(batch) ++ <td>processing</td>
   }
 
-  private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
+  private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
     baseRow(batch) ++ <td>queued</td>
   }
 }
 
-private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
   extends BatchTableBase("completed-batches-table") {
 
   override protected def columns: Seq[Node] = super.columns ++ <th>Total Delay</th>
@@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
     batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
   }
 
-  private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
+  private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
     val totalDelay = batch.totalDelay
     val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
     baseRow(batch) ++

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
new file mode 100644
index 0000000..2da9a29
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+
+private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+  private val streamingListener = parent.listener
+  private val sparkListener = parent.ssc.sc.jobProgressListener
+
+  private def columns: Seq[Node] = {
+    <th>Output Op Id</th>
+      <th>Description</th>
+      <th>Duration</th>
+      <th>Job Id</th>
+      <th>Duration</th>
+      <th class="sorttable_nosort">Stages: Succeeded/Total</th>
+      <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
+      <th>Error</th>
+  }
+
+  /**
+   * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
+   * one cell, we use "rowspan" for the first row of a output op.
+   */
+  def generateJobRow(
+      outputOpId: OutputOpId,
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      sparkJob: JobUIData): Seq[Node] = {
+    val lastStageInfo = Option(sparkJob.stageIds)
+      .filter(_.nonEmpty)
+      .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
+    val lastStageData = lastStageInfo.flatMap { s =>
+      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+    }
+
+    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+    val duration: Option[Long] = {
+      sparkJob.submissionTime.map { start =>
+        val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+        end - start
+      }
+    }
+    val lastFailureReason =
+      sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
+      dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
+      flatMap(info => info.failureReason).headOption.getOrElse("")
+    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
+    val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+
+    // In the first row, output op id and its information needs to be shown. In other rows, these
+    // cells will be taken up due to "rowspan".
+    // scalastyle:off
+    val prefixCells =
+      if (isFirstRow) {
+        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+        <td rowspan={numSparkJobRowsInOutputOp.toString}>
+          <span class="description-input" title={lastStageDescription}>
+            {lastStageDescription}
+          </span>{lastStageName}
+        </td>
+        <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+      } else {
+        Nil
+      }
+    // scalastyle:on
+
+    <tr>
+      {prefixCells}
+      <td sorttable_customkey={sparkJob.jobId.toString}>
+        <a href={detailUrl}>
+          {sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
+        </a>
+      </td>
+      <td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
+        {formattedDuration}
+      </td>
+      <td class="stage-progress-cell">
+        {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
+        {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
+        {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
+      </td>
+      <td class="progress-cell">
+        {
+          UIUtils.makeProgressBar(
+            started = sparkJob.numActiveTasks,
+            completed = sparkJob.numCompletedTasks,
+            failed = sparkJob.numFailedTasks,
+            skipped = sparkJob.numSkippedTasks,
+            total = sparkJob.numTasks - sparkJob.numSkippedTasks)
+        }
+      </td>
+      {failureReasonCell(lastFailureReason)}
+    </tr>
+  }
+
+  private def generateOutputOpIdRow(
+      outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
+    val sparkjobDurations = sparkJobs.map(sparkJob => {
+      sparkJob.submissionTime.map { start =>
+        val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+        end - start
+      }
+    })
+    val formattedOutputOpDuration =
+      if (sparkjobDurations.exists(_ == None)) {
+        // If any job does not finish, set "formattedOutputOpDuration" to "-"
+        "-"
+      } else {
+        UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+      }
+    generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+      sparkJobs.tail.map { sparkJob =>
+        generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+      }.flatMap(x => x)
+  }
+
+  private def failureReasonCell(failureReason: String): Seq[Node] = {
+    val isMultiline = failureReason.indexOf('\n') >= 0
+    // Display the first line by default
+    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        failureReason.substring(0, failureReason.indexOf('\n'))
+      } else {
+        failureReason
+      })
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+        <div class="stacktrace-details collapsed">
+          <pre>{failureReason}</pre>
+        </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+    <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+  }
+
+  private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
+    sparkListener.activeJobs.get(sparkJobId).orElse {
+      sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
+        sparkListener.failedJobs.find(_.jobId == sparkJobId)
+      }
+    }
+  }
+
+  /**
+   * Generate the job table for the batch.
+   */
+  private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
+    val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
+      sortBy(_._1). // sorted by OutputOpId
+      map { case (outputOpId, outputOpIdAndSparkJobIds) =>
+        // sort SparkJobIds for each OutputOpId
+        (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+      }
+    sparkListener.synchronized {
+      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+        outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
+          // Filter out spark Job ids that don't exist in sparkListener
+          (outputOpId, sparkJobIds.flatMap(getJobData))
+        }
+
+      <table id="batch-job-table" class="table table-bordered table-striped table-condensed">
+        <thead>
+          {columns}
+        </thead>
+        <tbody>
+          {
+            outputOpIdWithJobs.map {
+              case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+            }
+          }
+        </tbody>
+      </table>
+    }
+  }
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
+      throw new IllegalArgumentException(s"Missing id parameter")
+    }
+    val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+
+    val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
+      throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
+    }
+
+    val formattedSchedulingDelay =
+      batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+    val formattedProcessingTime =
+      batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
+    val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+
+    val summary: NodeSeq =
+      <div>
+        <ul class="unstyled">
+          <li>
+            <strong>Batch Duration: </strong>
+            {UIUtils.formatDuration(streamingListener.batchDuration)}
+          </li>
+          <li>
+            <strong>Input data size: </strong>
+            {batchUIData.numRecords} records
+          </li>
+          <li>
+            <strong>Scheduling delay: </strong>
+            {formattedSchedulingDelay}
+          </li>
+          <li>
+            <strong>Processing time: </strong>
+            {formattedProcessingTime}
+          </li>
+          <li>
+            <strong>Total delay: </strong>
+            {formattedTotalDelay}
+          </li>
+        </ul>
+      </div>
+
+    val jobTable =
+      if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
+        <div>Cannot find any job for Batch {formattedBatchTime}.</div>
+      } else {
+        generateJobTable(batchUIData)
+      }
+
+    val content = summary ++ jobTable
+
+    UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
new file mode 100644
index 0000000..f45c291
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
+
+private[ui] case class BatchUIData(
+    val batchTime: Time,
+    val receiverNumRecords: Map[Int, Long],
+    val submissionTime: Long,
+    val processingStartTime: Option[Long],
+    val processingEndTime: Option[Long],
+    var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+
+  /**
+   * Time taken for the first job of this batch to start processing from the time this batch
+   * was submitted to the streaming scheduler. Essentially, it is
+   * `processingStartTime` - `submissionTime`.
+   */
+  def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
+
+  /**
+   * Time taken for the all jobs of this batch to finish processing from the time they started
+   * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+   */
+  def processingDelay: Option[Long] = {
+    for (start <- processingStartTime;
+         end <- processingEndTime)
+      yield end - start
+  }
+
+  /**
+   * Time taken for all the jobs of this batch to finish processing from the time they
+   * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
+   */
+  def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime)
+
+  /**
+   * The number of recorders received by the receivers in this batch.
+   */
+  def numRecords: Long = receiverNumRecords.map(_._2).sum
+}
+
+private[ui] object BatchUIData {
+
+  def apply(batchInfo: BatchInfo): BatchUIData = {
+    new BatchUIData(
+      batchInfo.batchTime,
+      batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum),
+      batchInfo.submissionTime,
+      batchInfo.processingStartTime,
+      batchInfo.processingEndTime
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index be1e868..34b5571 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,29 +17,58 @@
 
 package org.apache.spark.streaming.ui
 
-import scala.collection.mutable.{Queue, HashMap}
+import java.util.LinkedHashMap
+import java.util.{Map => JMap}
+import java.util.Properties
 
+import scala.collection.mutable.{ArrayBuffer, Queue, HashMap, SynchronizedBuffer}
+
+import org.apache.spark.scheduler._
 import org.apache.spark.streaming.{Time, StreamingContext}
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
-import org.apache.spark.streaming.scheduler.BatchInfo
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
 import org.apache.spark.util.Distribution
 
 
 private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
-  extends StreamingListener {
+  extends StreamingListener with SparkListener {
 
-  private val waitingBatchInfos = new HashMap[Time, BatchInfo]
-  private val runningBatchInfos = new HashMap[Time, BatchInfo]
-  private val completedBatchInfos = new Queue[BatchInfo]
-  private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+  private val waitingBatchUIData = new HashMap[Time, BatchUIData]
+  private val runningBatchUIData = new HashMap[Time, BatchUIData]
+  private val completedBatchUIData = new Queue[BatchUIData]
+  private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
   private var totalCompletedBatches = 0L
   private var totalReceivedRecords = 0L
   private var totalProcessedRecords = 0L
   private val receiverInfos = new HashMap[Int, ReceiverInfo]
 
+  // Because onJobStart and onBatchXXX messages are processed in different threads,
+  // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
+  // cannot use a map of (Time, BatchUIData).
+  private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
+    new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+      override def removeEldestEntry(
+          p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+        // If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
+        // SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
+        // may add some information for a removed batch when processing "onJobStart". It will be a
+        // memory leak.
+        //
+        // To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
+        // evict the eldest one.
+        //
+        // Note: if "onJobStart" happens before "onBatchSubmitted", the size of
+        // "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
+        // batches temporarily, so here we use "10" to handle such case. This is not a perfect
+        // solution, but at least it can handle most of cases.
+        size() >
+          waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10
+      }
+    }
+
+
   val batchDuration = ssc.graph.batchDuration.milliseconds
 
   override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
@@ -62,37 +91,62 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
     synchronized {
-      waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+      waitingBatchUIData(batchSubmitted.batchInfo.batchTime) =
+        BatchUIData(batchSubmitted.batchInfo)
     }
   }
 
   override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
-    runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
-    waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+    val batchUIData = BatchUIData(batchStarted.batchInfo)
+    runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
+    waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)
 
-    batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
-      totalReceivedRecords += infos.map(_.numRecords).sum
-    }
+    totalReceivedRecords += batchUIData.numRecords
   }
 
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
     synchronized {
-      waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-      runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-      completedBatchInfos.enqueue(batchCompleted.batchInfo)
-      if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
+      waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+      runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+      val batchUIData = BatchUIData(batchCompleted.batchInfo)
+      completedBatchUIData.enqueue(batchUIData)
+      if (completedBatchUIData.size > batchUIDataLimit) {
+        val removedBatch = completedBatchUIData.dequeue()
+        batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
+      }
       totalCompletedBatches += 1L
 
-      batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
-        totalProcessedRecords += infos.map(_.numRecords).sum
+      totalProcessedRecords += batchUIData.numRecords
+    }
+  }
+
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
+    getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
+      var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
+      if (outputOpIdToSparkJobIds == null) {
+        outputOpIdToSparkJobIds =
+          new ArrayBuffer[OutputOpIdAndSparkJobId]()
+            with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+        batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
       }
+      outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
     }
   }
 
-  def numReceivers: Int = synchronized {
-    ssc.graph.getReceiverInputStreams().size
+  private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = {
+    val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY)
+    if (batchTime == null) {
+      // Not submitted from JobScheduler
+      None
+    } else {
+      val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY)
+      assert(outputOpId != null)
+      Some(Time(batchTime.toLong) -> outputOpId.toInt)
+    }
   }
 
+  def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
+
   def numTotalCompletedBatches: Long = synchronized {
     totalCompletedBatches
   }
@@ -106,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def numUnprocessedBatches: Long = synchronized {
-    waitingBatchInfos.size + runningBatchInfos.size
+    waitingBatchUIData.size + runningBatchUIData.size
   }
 
-  def waitingBatches: Seq[BatchInfo] = synchronized {
-    waitingBatchInfos.values.toSeq
+  def waitingBatches: Seq[BatchUIData] = synchronized {
+    waitingBatchUIData.values.toSeq
   }
 
-  def runningBatches: Seq[BatchInfo] = synchronized {
-    runningBatchInfos.values.toSeq
+  def runningBatches: Seq[BatchUIData] = synchronized {
+    runningBatchUIData.values.toSeq
   }
 
-  def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
-    completedBatchInfos.toSeq
+  def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
+    completedBatchUIData.toSeq
   }
 
   def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -134,15 +188,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
-    val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
-    val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+    val latestBatches = retainedBatches.reverse.take(batchUIDataLimit)
     (0 until numReceivers).map { receiverId =>
-      val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
-        batchInfo.get(receiverId).getOrElse(Array.empty)
-      }
-      val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
-      // calculate records per second for each batch
-        blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+      val recordsOfParticularReceiver = latestBatches.map { batch =>
+        // calculate records per second for each batch
+        batch.receiverNumRecords.get(receiverId).sum.toDouble * 1000 / batchDuration
       }
       val distributionOption = Distribution(recordsOfParticularReceiver)
       (receiverId, distributionOption)
@@ -150,10 +200,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
-    val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+    val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
     lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
       (0 until numReceivers).map { receiverId =>
-        (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+        (receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
       }.toMap
     }.getOrElse {
       (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
@@ -164,20 +214,39 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     receiverInfos.get(receiverId)
   }
 
-  def lastCompletedBatch: Option[BatchInfo] = synchronized {
-    completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+  def lastCompletedBatch: Option[BatchUIData] = synchronized {
+    completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
   }
 
-  def lastReceivedBatch: Option[BatchInfo] = synchronized {
+  def lastReceivedBatch: Option[BatchUIData] = synchronized {
     retainedBatches.lastOption
   }
 
-  private def retainedBatches: Seq[BatchInfo] = {
-    (waitingBatchInfos.values.toSeq ++
-      runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
+  private def retainedBatches: Seq[BatchUIData] = {
+    (waitingBatchUIData.values.toSeq ++
+      runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
+  }
+
+  private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
+    Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
   }
 
-  private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
-    Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+  def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
+    val batchUIData = waitingBatchUIData.get(batchTime).orElse {
+      runningBatchUIData.get(batchTime).orElse {
+        completedBatchUIData.find(batch => batch.batchTime == batchTime)
+      }
+    }
+    batchUIData.foreach { _batchUIData =>
+      val outputOpIdToSparkJobIds =
+        Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+      _batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
+    }
+    batchUIData
   }
 }
+
+private[streaming] object StreamingJobProgressListener {
+  type SparkJobId = Int
+  type OutputOpId = Int
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 9a860ea..e403963 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -27,14 +27,16 @@ import StreamingTab._
  * Spark Web UI tab that shows statistics of a streaming job.
  * This assumes the given SparkContext has enabled its SparkUI.
  */
-private[spark] class StreamingTab(ssc: StreamingContext)
+private[spark] class StreamingTab(val ssc: StreamingContext)
   extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
 
   val parent = getSparkUI(ssc)
   val listener = ssc.progressListener
 
   ssc.addStreamingListener(listener)
+  ssc.sc.addSparkListener(listener)
   attachPage(new StreamingPage(this))
+  attachPage(new BatchPage(this))
   parent.attachTab(this)
 
   def detach() {

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 205ddf6..8de43ba 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming
 
+import scala.collection.mutable.Queue
+
 import org.openqa.selenium.WebDriver
 import org.openqa.selenium.htmlunit.HtmlUnitDriver
 import org.scalatest._
@@ -60,8 +62,28 @@ class UISeleniumSuite
     ssc
   }
 
+  private def setupStreams(ssc: StreamingContext): Unit = {
+    val rdds = Queue(ssc.sc.parallelize(1 to 4, 4))
+    val inputStream = ssc.queueStream(rdds)
+    inputStream.foreachRDD { rdd =>
+      rdd.foreach(_ => {})
+      rdd.foreach(_ => {})
+    }
+    inputStream.foreachRDD { rdd =>
+      rdd.foreach(_ => {})
+      try {
+        rdd.foreach(_ => throw new RuntimeException("Oops"))
+      } catch {
+        case e: SparkException if e.getMessage.contains("Oops") =>
+      }
+    }
+  }
+
   test("attaching and detaching a Streaming tab") {
     withStreamingContext(newSparkStreamingContext()) { ssc =>
+      setupStreams(ssc)
+      ssc.start()
+
       val sparkUI = ssc.sparkContext.ui.get
 
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
@@ -77,8 +99,8 @@ class UISeleniumSuite
         statisticText should contain("Batch interval:")
 
         val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
-        h4Text should contain("Active Batches (0)")
-        h4Text should contain("Completed Batches (last 0 out of 0)")
+        h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
+        h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
 
         findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
@@ -86,6 +108,63 @@ class UISeleniumSuite
         findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
         }
+
+        val batchLinks =
+          findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
+        batchLinks.size should be >= 1
+
+        // Check a normal batch page
+        go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
+        val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
+        summaryText should contain ("Batch Duration:")
+        summaryText should contain ("Input data size:")
+        summaryText should contain ("Scheduling delay:")
+        summaryText should contain ("Processing time:")
+        summaryText should contain ("Total delay:")
+
+        findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
+          List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+            "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
+        }
+
+        // Check we have 2 output op ids
+        val outputOpIds = findAll(cssSelector(".output-op-id-cell")).toSeq
+        outputOpIds.map(_.attribute("rowspan")) should be (List(Some("2"), Some("2")))
+        outputOpIds.map(_.text) should be (List("0", "1"))
+
+        // Check job ids
+        val jobIdCells = findAll(cssSelector( """#batch-job-table a""")).toSeq
+        jobIdCells.map(_.text) should be (List("0", "1", "2", "3"))
+
+        val jobLinks = jobIdCells.flatMap(_.attribute("href"))
+        jobLinks.size should be (4)
+
+        // Check stage progress
+        findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be
+          (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+
+        // Check job progress
+        findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
+          (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+
+        // Check stacktrace
+        val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
+        errorCells should have size 1
+        errorCells(0) should include("java.lang.RuntimeException: Oops")
+
+        // Check the job link in the batch page is right
+        go to (jobLinks(0))
+        val jobDetails = findAll(cssSelector("li strong")).map(_.text).toSeq
+        jobDetails should contain("Status:")
+        jobDetails should contain("Completed Stages:")
+
+        // Check a batch page without id
+        go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/")
+        webDriver.getPageSource should include ("Missing id parameter")
+
+        // Check a non-exist batch
+        go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/?id=12345")
+        webDriver.getPageSource should include ("does not exist")
       }
 
       ssc.stop(false)

http://git-wip-us.apache.org/repos/asf/spark/blob/1b7106b8/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 94b1985..fa89536 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.streaming.ui
 
+import java.util.Properties
+
 import org.scalatest.Matchers
 
+import org.apache.spark.scheduler.SparkListenerJobStart
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase}
@@ -28,6 +31,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
   val input = (1 to 4).map(Seq(_)).toSeq
   val operation = (d: DStream[Int]) => d.map(x => x)
 
+  private def createJobStart(
+      batchTime: Time, outputOpId: Int, jobId: Int): SparkListenerJobStart = {
+    val properties = new Properties()
+    properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, batchTime.milliseconds.toString)
+    properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, outputOpId.toString)
+    SparkListenerJobStart(jobId = jobId,
+      0L, // unused
+      Nil, // unused
+      properties)
+  }
+
   override def batchDuration: Duration = Milliseconds(100)
 
   test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
@@ -43,7 +57,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     // onBatchSubmitted
     val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
     listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
-    listener.waitingBatches should be (List(batchInfoSubmitted))
+    listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
     listener.runningBatches should be (Nil)
     listener.retainedCompletedBatches should be (Nil)
     listener.lastCompletedBatch should be (None)
@@ -56,7 +70,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
     listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
     listener.waitingBatches should be (Nil)
-    listener.runningBatches should be (List(batchInfoStarted))
+    listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
     listener.retainedCompletedBatches should be (Nil)
     listener.lastCompletedBatch should be (None)
     listener.numUnprocessedBatches should be (1)
@@ -64,13 +78,40 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     listener.numTotalProcessedRecords should be (0)
     listener.numTotalReceivedRecords should be (600)
 
+    // onJobStart
+    val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
+    listener.onJobStart(jobStart1)
+
+    val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
+    listener.onJobStart(jobStart2)
+
+    val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
+    listener.onJobStart(jobStart3)
+
+    val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
+    listener.onJobStart(jobStart4)
+
+    val batchUIData = listener.getBatchUIData(Time(1000))
+    batchUIData should not be None
+    batchUIData.get.batchTime should be (batchInfoStarted.batchTime)
+    batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
+    batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
+    batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
+    batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+    batchUIData.get.numRecords should be(600)
+    batchUIData.get.outputOpIdSparkJobIdPairs should be
+      Seq(OutputOpIdAndSparkJobId(0, 0),
+        OutputOpIdAndSparkJobId(0, 1),
+        OutputOpIdAndSparkJobId(1, 0),
+        OutputOpIdAndSparkJobId(1, 1))
+
     // onBatchCompleted
     val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
     listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (Nil)
-    listener.retainedCompletedBatches should be (List(batchInfoCompleted))
-    listener.lastCompletedBatch should be (Some(batchInfoCompleted))
+    listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
+    listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
     listener.numUnprocessedBatches should be (0)
     listener.numTotalCompletedBatches should be (1)
     listener.numTotalProcessedRecords should be (600)
@@ -116,4 +157,55 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     listener.retainedCompletedBatches.size should be (limit)
     listener.numTotalCompletedBatches should be(limit + 10)
   }
+
+  test("out-of-order onJobStart and onBatchXXX") {
+    val ssc = setupStreams(input, operation)
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    // fulfill completedBatchInfos
+    for(i <- 0 until limit) {
+      val batchInfoCompleted =
+        BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+      listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+      val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
+      listener.onJobStart(jobStart)
+    }
+
+    // onJobStart happens before onBatchSubmitted
+    val jobStart = createJobStart(Time(1000 + limit * 100), outputOpId = 0, jobId = 0)
+    listener.onJobStart(jobStart)
+
+    val batchInfoSubmitted =
+      BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
+    listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+
+    // We still can see the info retrieved from onJobStart
+    val batchUIData = listener.getBatchUIData(Time(1000 + limit * 100))
+    batchUIData should not be None
+    batchUIData.get.batchTime should be (batchInfoSubmitted.batchTime)
+    batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
+    batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
+    batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
+    batchUIData.get.receiverNumRecords should be (Map.empty)
+    batchUIData.get.numRecords should be (0)
+    batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
+
+    // A lot of "onBatchCompleted"s happen before "onJobStart"
+    for(i <- limit + 1 to limit * 2) {
+      val batchInfoCompleted =
+        BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+      listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    }
+
+    for(i <- limit + 1 to limit * 2) {
+      val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
+      listener.onJobStart(jobStart)
+    }
+
+    // We should not leak memory
+    listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
+      (listener.waitingBatches.size + listener.runningBatches.size +
+        listener.retainedCompletedBatches.size + 10)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org