You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/16 22:53:11 UTC

spark git commit: [SPARK-10974] [STREAMING] Add progress bar for output operation column and use red dots for failed batches

Repository: spark
Updated Branches:
  refs/heads/master 3d683a139 -> 369d786f5


[SPARK-10974] [STREAMING] Add progress bar for output operation column and use red dots for failed batches

Screenshot:
<img width="1363" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10342571/385d9340-6d4c-11e5-8e79-1fa4c3c98f81.png">

Also fixed the description and duration for output operations that don't have spark jobs.
<img width="1354" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10342775/4bd52a0e-6d4d-11e5-99bc-26265a9fc792.png">

Author: zsxwing <zs...@gmail.com>

Closes #9010 from zsxwing/output-op-progress-bar.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369d786f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369d786f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369d786f

Branch: refs/heads/master
Commit: 369d786f58580e7df73e7e23f27390d37269d0de
Parents: 3d683a1
Author: zsxwing <zs...@gmail.com>
Authored: Fri Oct 16 13:53:06 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Oct 16 13:53:06 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/ui/static/streaming-page.js |  26 +--
 .../apache/spark/streaming/DStreamGraph.scala   |   2 +-
 .../spark/streaming/scheduler/BatchInfo.scala   |  23 +--
 .../apache/spark/streaming/scheduler/Job.scala  |  30 +++-
 .../streaming/scheduler/JobScheduler.scala      |  12 +-
 .../spark/streaming/scheduler/JobSet.scala      |  17 +-
 .../scheduler/OutputOperationInfo.scala         |   6 +-
 .../spark/streaming/ui/AllBatchesTable.scala    |  40 +++--
 .../apache/spark/streaming/ui/BatchPage.scala   | 174 ++++++++-----------
 .../apache/spark/streaming/ui/BatchUIData.scala |  67 ++++++-
 .../ui/StreamingJobProgressListener.scala       |  14 ++
 .../streaming/StreamingListenerSuite.scala      |  16 +-
 .../spark/streaming/UISeleniumSuite.scala       |   2 +-
 .../ui/StreamingJobProgressListenerSuite.scala  |  30 ++--
 14 files changed, 258 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
----------------------------------------------------------------------
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
index 4886b68..f82323a 100644
--- a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
+++ b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
@@ -154,34 +154,40 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
     var lastClickedBatch = null;
     var lastTimeout = null;
 
+    function isFailedBatch(batchTime) {
+        return $("#batch-" + batchTime).attr("isFailed") == "true";
+    }
+
     // Add points to the line. However, we make it invisible at first. But when the user moves mouse
     // over a point, it will be displayed with its detail.
     svg.selectAll(".point")
         .data(data)
         .enter().append("circle")
-            .attr("stroke", "white") // white and opacity = 0 make it invisible
-            .attr("fill", "white")
-            .attr("opacity", "0")
+            .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) // white and opacity = 0 make it invisible
+            .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+            .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";})
             .style("cursor", "pointer")
             .attr("cx", function(d) { return x(d.x); })
             .attr("cy", function(d) { return y(d.y); })
-            .attr("r", function(d) { return 3; })
+            .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";})
             .on('mouseover', function(d) {
                 var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
                 showBootstrapTooltip(d3.select(this).node(), tip);
                 // show the point
                 d3.select(this)
-                    .attr("stroke", "steelblue")
-                    .attr("fill", "steelblue")
-                    .attr("opacity", "1");
+                    .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";})
+                    .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";})
+                    .attr("opacity", "1")
+                    .attr("r", "3");
             })
             .on('mouseout',  function() {
                 hideBootstrapTooltip(d3.select(this).node());
                 // hide the point
                 d3.select(this)
-                    .attr("stroke", "white")
-                    .attr("fill", "white")
-                    .attr("opacity", "0");
+                    .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+                    .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+                    .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";})
+                    .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";});
             })
             .on("click", function(d) {
                 if (lastTimeout != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index de79c9e..1b0b789 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -113,7 +113,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
     val jobs = this.synchronized {
       outputStreams.flatMap { outputStream =>
         val jobOption = outputStream.generateJob(time)
-        jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm))
+        jobOption.foreach(_.setCallSite(outputStream.creationSite))
         jobOption
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 463f899..436eb0a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
  *                        the streaming scheduler queue
  * @param processingStartTime Clock time of when the first job of this batch started processing
  * @param processingEndTime Clock time of when the last job of this batch finished processing
+ * @param outputOperationInfos The output operations in this batch
  */
 @DeveloperApi
 case class BatchInfo(
@@ -36,13 +37,10 @@ case class BatchInfo(
     streamIdToInputInfo: Map[Int, StreamInputInfo],
     submissionTime: Long,
     processingStartTime: Option[Long],
-    processingEndTime: Option[Long]
+    processingEndTime: Option[Long],
+    outputOperationInfos: Map[Int, OutputOperationInfo]
   ) {
 
-  private var _failureReasons: Map[Int, String] = Map.empty
-
-  private var _numOutputOp: Int = 0
-
   @deprecated("Use streamIdToInputInfo instead", "1.5.0")
   def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
 
@@ -72,19 +70,4 @@ case class BatchInfo(
    */
   def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
 
-  /** Set the failure reasons corresponding to every output ops in the batch */
-  private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
-    _failureReasons = reasons
-  }
-
-  /** Failure reasons corresponding to every output ops in the batch */
-  private[streaming] def failureReasons = _failureReasons
-
-  /** Set the number of output operations in this batch */
-  private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
-    _numOutputOp = numOutputOp
-  }
-
-  /** Return the number of output operations in this batch */
-  private[streaming] def numOutputOp: Int = _numOutputOp
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 1373053..ab1b356 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.streaming.scheduler
 
+import scala.util.{Failure, Try}
+
 import org.apache.spark.streaming.Time
-import scala.util.Try
+import org.apache.spark.util.{Utils, CallSite}
 
 /**
  * Class representing a Spark computation. It may contain multiple Spark jobs.
@@ -29,7 +31,9 @@ class Job(val time: Time, func: () => _) {
   private var _outputOpId: Int = _
   private var isSet = false
   private var _result: Try[_] = null
-  private var _callSite: String = "Unknown"
+  private var _callSite: CallSite = null
+  private var _startTime: Option[Long] = None
+  private var _endTime: Option[Long] = None
 
   def run() {
     _result = Try(func())
@@ -71,11 +75,29 @@ class Job(val time: Time, func: () => _) {
     _outputOpId = outputOpId
   }
 
-  def setCallSite(callSite: String): Unit = {
+  def setCallSite(callSite: CallSite): Unit = {
     _callSite = callSite
   }
 
-  def callSite: String = _callSite
+  def callSite: CallSite = _callSite
+
+  def setStartTime(startTime: Long): Unit = {
+    _startTime = Some(startTime)
+  }
+
+  def setEndTime(endTime: Long): Unit = {
+    _endTime = Some(endTime)
+  }
+
+  def toOutputOperationInfo: OutputOperationInfo = {
+    val failureReason = if (_result != null && _result.isFailure) {
+      Some(Utils.exceptionString(_result.asInstanceOf[Failure[_]].exception))
+    } else {
+      None
+    }
+    OutputOperationInfo(
+      time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
+  }
 
   override def toString: String = id
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 0a4a396..2480b4e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -20,13 +20,13 @@ package org.apache.spark.streaming.scheduler
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
+import scala.util.Failure
 
 import org.apache.spark.Logging
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{EventLoop, ThreadUtils}
+import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
 
 
 private[scheduler] sealed trait JobSchedulerEvent
@@ -162,16 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       // correct "jobSet.processingStartTime".
       listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
     }
-    listenerBus.post(StreamingListenerOutputOperationStarted(
-      OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None)))
+    job.setStartTime(startTime)
+    listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
   }
 
   private def handleJobCompletion(job: Job, completedTime: Long) {
     val jobSet = jobSets.get(job.time)
     jobSet.handleJobCompletion(job)
-    listenerBus.post(StreamingListenerOutputOperationCompleted(
-      OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime))))
+    job.setEndTime(completedTime)
+    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
     logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
     if (jobSet.hasCompleted) {
       jobSets.remove(jobSet.time)

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 08f63cc..f763003 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -64,24 +64,13 @@ case class JobSet(
   }
 
   def toBatchInfo: BatchInfo = {
-    val failureReasons: Map[Int, String] = {
-      if (hasCompleted) {
-        jobs.filter(_.result.isFailure).map { job =>
-          (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
-        }.toMap
-      } else {
-        Map.empty
-      }
-    }
-    val binfo = new BatchInfo(
+    BatchInfo(
       time,
       streamIdToInputInfo,
       submissionTime,
       if (processingStartTime >= 0) Some(processingStartTime) else None,
-      if (processingEndTime >= 0) Some(processingEndTime) else None
+      if (processingEndTime >= 0) Some(processingEndTime) else None,
+      jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap
     )
-    binfo.setFailureReason(failureReasons)
-    binfo.setNumOutputOp(jobs.size)
-    binfo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
index d5614b3..137e512 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
@@ -25,17 +25,21 @@ import org.apache.spark.streaming.Time
  * Class having information on output operations.
  * @param batchTime Time of the batch
  * @param id Id of this output operation. Different output operations have different ids in a batch.
+ * @param name The name of this output operation.
  * @param description The description of this output operation.
  * @param startTime Clock time of when the output operation started processing
  * @param endTime Clock time of when the output operation started processing
+ * @param failureReason Failure reason if this output operation fails
  */
 @DeveloperApi
 case class OutputOperationInfo(
     batchTime: Time,
     id: Int,
+    name: String,
     description: String,
     startTime: Option[Long],
-    endTime: Option[Long]) {
+    endTime: Option[Long],
+    failureReason: Option[String]) {
 
   /**
    * Return the duration of this output operation.

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 3e6590d..125cafd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.streaming.ui
 
-import java.text.SimpleDateFormat
-import java.util.Date
-
 import scala.xml.Node
 
 import org.apache.spark.ui.{UIUtils => SparkUIUtils}
@@ -46,7 +43,8 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
     val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
     val batchTimeId = s"batch-$batchTime"
 
-    <td id={batchTimeId} sorttable_customkey={batchTime.toString}>
+    <td id={batchTimeId} sorttable_customkey={batchTime.toString}
+        isFailed={batch.isFailed.toString}>
       <a href={s"batch?id=$batchTime"}>
         {formattedBatchTime}
       </a>
@@ -75,6 +73,19 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
     batchTable
   }
 
+  protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+    <td class="progress-cell">
+      {
+      SparkUIUtils.makeProgressBar(
+        started = batch.numActiveOutputOp,
+        completed = batch.numCompletedOutputOp,
+        failed = batch.numFailedOutputOp,
+        skipped = 0,
+        total = batch.outputOperations.size)
+      }
+    </td>
+  }
+
   /**
    * Return HTML for all rows of this table.
    */
@@ -86,7 +97,10 @@ private[ui] class ActiveBatchTable(
     waitingBatches: Seq[BatchUIData],
     batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
 
-  override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
+  override protected def columns: Seq[Node] = super.columns ++ {
+    <th>Output Ops: Succeeded/Total</th>
+      <th>Status</th>
+  }
 
   override protected def renderRows: Seq[Node] = {
     // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
@@ -96,11 +110,11 @@ private[ui] class ActiveBatchTable(
   }
 
   private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ <td>processing</td>
+    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td>
   }
 
   private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ <td>queued</td>
+    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>
   }
 }
 
@@ -119,17 +133,11 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
   private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
     val totalDelay = batch.totalDelay
     val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-    val numFailedOutputOp = batch.failureReason.size
-    val outputOpColumn = if (numFailedOutputOp > 0) {
-        s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
-          s" (${numFailedOutputOp} failed)"
-      } else {
-        s"${batch.numOutputOp}/${batch.numOutputOp}"
-      }
-    baseRow(batch) ++
+
+    baseRow(batch) ++ {
       <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
         {formattedTotalDelay}
       </td>
-      <td>{outputOpColumn}</td>
+    } ++ createOutputOperationProgressBar(batch)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index a19b85a..2ed9255 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -47,32 +47,30 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   }
 
   private def generateJobRow(
-      outputOpId: OutputOpId,
+      outputOpData: OutputOperationUIData,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
-      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: SparkJobIdWithUIData): Seq[Node] = {
     if (sparkJob.jobUIData.isDefined) {
-      generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
-        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+      generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
     } else {
-      generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
-        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+      generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
     }
   }
 
   private def generateOutputOpRowWithoutSparkJobs(
-    outputOpId: OutputOpId,
+    outputOpData: OutputOperationUIData,
     outputOpDescription: Seq[Node],
-    formattedOutputOpDuration: String,
-    outputOpStatus: String): Seq[Node] = {
+    formattedOutputOpDuration: String): Seq[Node] = {
     <tr>
-      <td class="output-op-id-cell" >{outputOpId.toString}</td>
+      <td class="output-op-id-cell" >{outputOpData.id.toString}</td>
       <td>{outputOpDescription}</td>
       <td>{formattedOutputOpDuration}</td>
-      {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+      {outputOpStatusCell(outputOpData, rowspan = 1)}
       <!-- Job Id -->
       <td>-</td>
       <!-- Duration -->
@@ -91,10 +89,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
    * one cell, we use "rowspan" for the first row of a output op.
    */
   private def generateNormalJobRow(
-      outputOpId: OutputOpId,
+      outputOpData: OutputOperationUIData,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
-      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
@@ -116,12 +113,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     // scalastyle:off
     val prefixCells =
       if (isFirstRow) {
-        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpData.id.toString}</td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
           {outputOpDescription}
         </td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
-        {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
+        {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -161,10 +158,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
    * with "-" cells.
    */
   private def generateDroppedJobRow(
-      outputOpId: OutputOpId,
+      outputOpData: OutputOperationUIData,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
-      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       jobId: Int): Seq[Node] = {
@@ -173,10 +169,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     // scalastyle:off
     val prefixCells =
       if (isFirstRow) {
-        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpData.id.toString}</td>
           <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
           <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
-          {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
+          {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -199,47 +195,34 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   }
 
   private def generateOutputOpIdRow(
-      outputOpId: OutputOpId,
-      outputOpStatus: String,
+      outputOpData: OutputOperationUIData,
       sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
-    // We don't count the durations of dropped jobs
-    val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
-      map(sparkJob => {
-        sparkJob.submissionTime.map { start =>
-          val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
-          end - start
-        }
-      })
     val formattedOutputOpDuration =
-      if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
-        // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
+      if (outputOpData.duration.isEmpty) {
         "-"
       } else {
-        SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
+        SparkUIUtils.formatDuration(outputOpData.duration.get)
       }
 
-    val description = generateOutputOpDescription(sparkJobs)
+    val description = generateOutputOpDescription(outputOpData)
 
     if (sparkJobs.isEmpty) {
-      generateOutputOpRowWithoutSparkJobs(
-        outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+      generateOutputOpRowWithoutSparkJobs(outputOpData, description, formattedOutputOpDuration)
     } else {
       val firstRow =
         generateJobRow(
-          outputOpId,
+          outputOpData,
           description,
           formattedOutputOpDuration,
-          outputOpStatus,
           sparkJobs.size,
           true,
           sparkJobs.head)
       val tailRows =
         sparkJobs.tail.map { sparkJob =>
           generateJobRow(
-            outputOpId,
+            outputOpData,
             description,
             formattedOutputOpDuration,
-            outputOpStatus,
             sparkJobs.size,
             false,
             sparkJob)
@@ -248,35 +231,18 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     }
   }
 
-  private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
-    val lastStageInfo =
-      sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
-        flatMap { sparkJob => // For the first job, get the latest Stage info
-          if (sparkJob.stageIds.isEmpty) {
-            None
-          } else {
-            sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
-          }
-        }
-    lastStageInfo match {
-      case Some(stageInfo) =>
-        val details = if (stageInfo.details.nonEmpty) {
-          <span
-            onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
-            class="expand-details">
-              +details
-          </span> ++
-          <div class="stage-details collapsed">
-            <pre>{stageInfo.details}</pre>
-          </div>
-        } else {
-          NodeSeq.Empty
-        }
-
-        <div> {stageInfo.name} {details} </div>
-      case None =>
-        Text("(Unknown)")
-    }
+  private def generateOutputOpDescription(outputOp: OutputOperationUIData): Seq[Node] = {
+    <div>
+      {outputOp.name}
+      <span
+        onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+        class="expand-details">
+          +details
+      </span>
+      <div class="stage-details collapsed">
+        <pre>{outputOp.description}</pre>
+      </div>
+    </div>
   }
 
   private def failureReasonCell(
@@ -329,6 +295,19 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     }
   }
 
+  private def generateOutputOperationStatusForUI(failure: String): String = {
+    if (failure.startsWith("org.apache.spark.SparkException")) {
+      "Failed due to Spark job error\n" + failure
+    } else {
+      var nextLineIndex = failure.indexOf("\n")
+      if (nextLineIndex < 0) {
+        nextLineIndex = failure.size
+      }
+      val firstLine = failure.substring(0, nextLineIndex)
+      s"Failed due to error: $firstLine\n$failure"
+    }
+  }
+
   /**
    * Generate the job table for the batch.
    */
@@ -338,26 +317,15 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         // sort SparkJobIds for each OutputOpId
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
-    val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
-      val status = batchUIData.failureReason.get(outputOpId).map { failure =>
-        if (failure.startsWith("org.apache.spark.SparkException")) {
-          "Failed due to Spark job error\n" + failure
-        } else {
-          var nextLineIndex = failure.indexOf("\n")
-          if (nextLineIndex < 0) {
-            nextLineIndex = failure.size
-          }
-          val firstLine = failure.substring(0, nextLineIndex)
-          s"Failed due to error: $firstLine\n$failure"
-        }
-      }.getOrElse("Succeeded")
-      val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
-      (outputOpId, status, sparkJobIds)
-    }
+
+    val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =
+      batchUIData.outputOperations.map { case (outputOpId, outputOperation) =>
+        val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
+        (outputOperation, sparkJobIds)
+      }.toSeq.sortBy(_._1.id)
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
-        outputOps.map { case (outputOpId, status, sparkJobIds) =>
-          (outputOpId, status,
+      val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) =>
+          (outputOpData,
             sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
         }
 
@@ -367,9 +335,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         </thead>
         <tbody>
           {
-            outputOpIdWithJobs.map {
-              case (outputOpId, status, sparkJobIds) =>
-                generateOutputOpIdRow(outputOpId, status, sparkJobIds)
+            outputOpWithJobs.map { case (outputOpData, sparkJobIds) =>
+              generateOutputOpIdRow(outputOpData, sparkJobIds)
             }
           }
         </tbody>
@@ -377,7 +344,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     }
   }
 
-  def render(request: HttpServletRequest): Seq[Node] = {
+  def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized {
     val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
       throw new IllegalArgumentException(s"Missing id parameter")
     }
@@ -430,14 +397,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         </ul>
       </div>
 
-    val jobTable =
-      if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
-        <div>Cannot find any job for Batch {formattedBatchTime}.</div>
-      } else {
-        generateJobTable(batchUIData)
-      }
-
-    val content = summary ++ jobTable
+    val content = summary ++ generateJobTable(batchUIData)
 
     SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
   }
@@ -471,11 +431,17 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
   }
 
-  private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
-    if (status == "Succeeded") {
-      <td rowspan={rowspan.toString}>Succeeded</td>
-    } else {
-      failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
+  private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = {
+    outputOp.failureReason match {
+      case Some(failureReason) =>
+        val failureReasonForUI = generateOutputOperationStatusForUI(failureReason)
+        failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
+      case None =>
+        if (outputOp.endTime.isEmpty) {
+          <td rowspan={rowspan.toString}>-</td>
+        } else {
+          <td rowspan={rowspan.toString}>Succeeded</td>
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index e6c2e21..3ef3689 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -18,8 +18,10 @@
 
 package org.apache.spark.streaming.ui
 
+import scala.collection.mutable
+
 import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.{BatchInfo, OutputOperationInfo, StreamInputInfo}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener._
 
 private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
@@ -30,8 +32,7 @@ private[ui] case class BatchUIData(
     val submissionTime: Long,
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
-    val numOutputOp: Int,
-    val failureReason: Map[Int, String],
+    val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
     var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
 
   /**
@@ -61,19 +62,75 @@ private[ui] case class BatchUIData(
    * The number of recorders received by the receivers in this batch.
    */
   def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
+
+  /**
+   * Update an output operation information of this batch.
+   */
+  def updateOutputOperationInfo(outputOperationInfo: OutputOperationInfo): Unit = {
+    assert(batchTime == outputOperationInfo.batchTime)
+    outputOperations(outputOperationInfo.id) = OutputOperationUIData(outputOperationInfo)
+  }
+
+  /**
+   * Return the number of failed output operations.
+   */
+  def numFailedOutputOp: Int = outputOperations.values.count(_.failureReason.nonEmpty)
+
+  /**
+   * Return the number of running output operations.
+   */
+  def numActiveOutputOp: Int = outputOperations.values.count(_.endTime.isEmpty)
+
+  /**
+   * Return the number of completed output operations.
+   */
+  def numCompletedOutputOp: Int = outputOperations.values.count {
+      op => op.failureReason.isEmpty && op.endTime.nonEmpty
+    }
+
+  /**
+   * Return if this batch has any output operations
+   */
+  def isFailed: Boolean = numFailedOutputOp != 0
 }
 
 private[ui] object BatchUIData {
 
   def apply(batchInfo: BatchInfo): BatchUIData = {
+    val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]()
+    outputOperations ++= batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply)
     new BatchUIData(
       batchInfo.batchTime,
       batchInfo.streamIdToInputInfo,
       batchInfo.submissionTime,
       batchInfo.processingStartTime,
       batchInfo.processingEndTime,
-      batchInfo.numOutputOp,
-      batchInfo.failureReasons
+      outputOperations
+    )
+  }
+}
+
+private[ui] case class OutputOperationUIData(
+    id: OutputOpId,
+    name: String,
+    description: String,
+    startTime: Option[Long],
+    endTime: Option[Long],
+    failureReason: Option[String]) {
+
+  def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
+}
+
+private[ui] object OutputOperationUIData {
+
+  def apply(outputOperationInfo: OutputOperationInfo): OutputOperationUIData = {
+    OutputOperationUIData(
+      outputOperationInfo.id,
+      outputOperationInfo.name,
+      outputOperationInfo.description,
+      outputOperationInfo.startTime,
+      outputOperationInfo.endTime,
+      outputOperationInfo.failureReason
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 78aeb00..f6cc6ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -119,6 +119,20 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     }
   }
 
+  override def onOutputOperationStarted(
+      outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = synchronized {
+    // This method is called after onBatchStarted
+    runningBatchUIData(outputOperationStarted.outputOperationInfo.batchTime).
+      updateOutputOperationInfo(outputOperationStarted.outputOperationInfo)
+  }
+
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized {
+    // This method is called before onBatchCompleted
+    runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).
+      updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
+  }
+
   override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
     getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
       var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 2b43b74..5dc0472 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
 import scala.concurrent.Future
 import scala.concurrent.ExecutionContext.Implicits.global
 
@@ -221,7 +221,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
       }
     }
     _ssc.stop()
-    failureReasonsCollector.failureReasons
+    failureReasonsCollector.failureReasons.toMap
   }
 
   /** Check if a sequence of numbers is in increasing order */
@@ -307,14 +307,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
 }
 
 /**
- * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons`
- * field.
+ * A StreamingListener that saves all latest `failureReasons` in a batch.
  */
 class FailureReasonsCollector extends StreamingListener {
 
-  @volatile var failureReasons: Map[Int, String] = null
+  val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
 
-  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
-    failureReasons = batchCompleted.batchInfo.failureReasons
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
+    outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
+      failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index d1df788..a5744a9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -117,7 +117,7 @@ class UISeleniumSuite
 
         findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
-            "Status")
+            "Output Ops: Succeeded/Total", "Status")
         }
         findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",

http://git-wip-us.apache.org/repos/asf/spark/blob/369d786f/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 995f119..af4718b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
       1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
 
     // onBatchSubmitted
-    val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
+    val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
     listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
     listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
     listener.runningBatches should be (Nil)
@@ -75,7 +75,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     listener.numTotalReceivedRecords should be (0)
 
     // onBatchStarted
-    val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+    val batchInfoStarted =
+      BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
     listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
@@ -116,7 +117,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
         OutputOpIdAndSparkJobId(1, 1))
 
     // onBatchCompleted
-    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+    val batchInfoCompleted =
+      BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
     listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (Nil)
@@ -156,7 +158,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
 
     val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
 
-    val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+    val batchInfoCompleted =
+      BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
 
     for(_ <- 0 until (limit + 10)) {
       listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
@@ -173,8 +176,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
 
     // fulfill completedBatchInfos
     for(i <- 0 until limit) {
-      val batchInfoCompleted =
-        BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+      val batchInfoCompleted = BatchInfo(
+          Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
       listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
       val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
       listener.onJobStart(jobStart)
@@ -185,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     listener.onJobStart(jobStart)
 
     val batchInfoSubmitted =
-      BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
+      BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty)
     listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
 
     // We still can see the info retrieved from onJobStart
@@ -201,8 +204,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
 
     // A lot of "onBatchCompleted"s happen before "onJobStart"
     for(i <- limit + 1 to limit * 2) {
-      val batchInfoCompleted =
-        BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+      val batchInfoCompleted = BatchInfo(
+          Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
       listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     }
 
@@ -227,11 +230,13 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
       val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
 
       // onBatchSubmitted
-      val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
+      val batchInfoSubmitted =
+        BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
       listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
 
       // onBatchStarted
-      val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+      val batchInfoStarted =
+        BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
       listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
 
       // onJobStart
@@ -248,7 +253,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
       listener.onJobStart(jobStart4)
 
       // onBatchCompleted
-      val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+      val batchInfoCompleted =
+        BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
       listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org