You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/22 20:51:43 UTC

spark git commit: [SPARK-18537][WEB UI] Add a REST api to serve spark streaming information

Repository: spark
Updated Branches:
  refs/heads/master 31da755c8 -> ce99f51d2


[SPARK-18537][WEB UI] Add a REST api to serve spark streaming information

## What changes were proposed in this pull request?

This PR is an inheritance from #16000, and is a completion of #15904.

**Description**

- Augment the `org.apache.spark.status.api.v1` package for serving streaming information.
- Retrieve the streaming information through StreamingJobProgressListener.

> this api should cover exceptly the same amount of information as you can get from the web interface
> the implementation is base on the current REST implementation of spark-core
> and will be available for running applications only
>
> https://issues.apache.org/jira/browse/SPARK-18537

## How was this patch tested?

Local test.

Author: saturday_s <sh...@gmail.com>
Author: Chan Chor Pang <Ch...@access-company.com>
Author: peterCPChan <un...@gmail.com>

Closes #16253 from saturday-shi/SPARK-18537.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce99f51d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce99f51d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce99f51d

Branch: refs/heads/master
Commit: ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc
Parents: 31da755
Author: saturday_s <sh...@gmail.com>
Authored: Thu Dec 22 12:51:37 2016 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Dec 22 12:51:37 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/SparkUI.scala     |  8 ++
 project/MimaExcludes.scala                      |  6 +-
 .../status/api/v1/streaming/BatchStatus.java    | 30 ++++++++
 .../api/v1/streaming/AllBatchesResource.scala   | 78 ++++++++++++++++++++
 .../streaming/AllOutputOperationsResource.scala | 66 +++++++++++++++++
 .../api/v1/streaming/AllReceiversResource.scala | 76 +++++++++++++++++++
 .../api/v1/streaming/ApiStreamingApp.scala      | 42 +++++++++++
 .../v1/streaming/ApiStreamingRootResource.scala | 74 +++++++++++++++++++
 .../api/v1/streaming/OneBatchResource.scala     | 35 +++++++++
 .../streaming/OneOutputOperationResource.scala  | 39 ++++++++++
 .../api/v1/streaming/OneReceiverResource.scala  | 35 +++++++++
 .../streaming/StreamingStatisticsResource.scala | 64 ++++++++++++++++
 .../spark/status/api/v1/streaming/api.scala     | 75 +++++++++++++++++++
 .../spark/streaming/StreamingContext.scala      |  5 +-
 .../api/java/JavaStreamingListener.scala        | 14 ++++
 .../api/java/JavaStreamingListenerWrapper.scala |  5 ++
 .../streaming/scheduler/StreamingListener.scala |  6 ++
 .../scheduler/StreamingListenerBus.scala        |  2 +
 .../ui/StreamingJobProgressListener.scala       | 12 ++-
 .../spark/streaming/ui/StreamingPage.scala      |  3 +-
 .../spark/streaming/ui/StreamingTab.scala       |  1 +
 .../JavaStreamingListenerAPISuite.java          |  5 ++
 .../JavaStreamingListenerWrapperSuite.scala     |  9 +++
 .../ui/StreamingJobProgressListenerSuite.scala  |  4 +
 24 files changed, 689 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index b828532..7d31ac5 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -60,6 +60,8 @@ private[spark] class SparkUI private (
 
   var appId: String = _
 
+  private var streamingJobProgressListener: Option[SparkListener] = None
+
   /** Initialize all components of the server. */
   def initialize() {
     val jobsTab = new JobsTab(this)
@@ -124,6 +126,12 @@ private[spark] class SparkUI private (
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
     getApplicationInfoList.find(_.id == appId)
   }
+
+  def getStreamingJobProgressListener: Option[SparkListener] = streamingJobProgressListener
+
+  def setStreamingJobProgressListener(sparkListener: SparkListener): Unit = {
+    streamingJobProgressListener = Option(sparkListener)
+  }
 }
 
 private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 20f5c27..2314d7f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -38,8 +38,12 @@ object MimaExcludes {
   lazy val v22excludes = v21excludes ++ Seq(
     // [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"),
+
     // [SPARK-18949] [SQL] Add repairTable API to Catalog
-    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),
+
+    // [SPARK-18537] Add a REST api to spark streaming
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
   )
 
   // Exclude rules for 2.1.x

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java
new file mode 100644
index 0000000..1bbca5a
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum BatchStatus {
+  COMPLETED,
+  QUEUED,
+  PROCESSING;
+
+  public static BatchStatus fromString(String str) {
+    return EnumUtil.parseIgnoreCase(BatchStatus.class, str);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
new file mode 100644
index 0000000..3a51ae6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList}
+import javax.ws.rs.{GET, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.streaming.AllBatchesResource._
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = {
+    batchInfoList(listener, statusParams).sortBy(- _.batchId)
+  }
+}
+
+private[v1] object AllBatchesResource {
+
+  def batchInfoList(
+      listener: StreamingJobProgressListener,
+      statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = {
+
+    listener.synchronized {
+      val statuses =
+        if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams
+      val statusToBatches = Seq(
+        BatchStatus.COMPLETED -> listener.retainedCompletedBatches,
+        BatchStatus.QUEUED -> listener.waitingBatches,
+        BatchStatus.PROCESSING -> listener.runningBatches
+      )
+
+      val batchInfos = for {
+        (status, batches) <- statusToBatches
+        batch <- batches if statuses.contains(status)
+      } yield {
+        val batchId = batch.batchTime.milliseconds
+        val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
+
+        new BatchInfo(
+          batchId = batchId,
+          batchTime = new Date(batchId),
+          status = status.toString,
+          batchDuration = listener.batchDuration,
+          inputSize = batch.numRecords,
+          schedulingDelay = batch.schedulingDelay,
+          processingTime = batch.processingDelay,
+          totalDelay = batch.totalDelay,
+          numActiveOutputOps = batch.numActiveOutputOp,
+          numCompletedOutputOps = batch.numCompletedOutputOp,
+          numFailedOutputOps = batch.numFailedOutputOp,
+          numTotalOutputOps = batch.outputOperations.size,
+          firstFailureReason = firstFailureReason
+        )
+      }
+
+      batchInfos
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
new file mode 100644
index 0000000..0eb649f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = {
+    outputOperationInfoList(listener, batchId).sortBy(_.outputOpId)
+  }
+}
+
+private[v1] object AllOutputOperationsResource {
+
+  def outputOperationInfoList(
+      listener: StreamingJobProgressListener,
+      batchId: Long): Seq[OutputOperationInfo] = {
+
+    listener.synchronized {
+      listener.getBatchUIData(Time(batchId)) match {
+        case Some(batch) =>
+          for ((opId, op) <- batch.outputOperations) yield {
+            val jobIds = batch.outputOpIdSparkJobIdPairs
+              .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted
+
+            new OutputOperationInfo(
+              outputOpId = opId,
+              name = op.name,
+              description = op.description,
+              startTime = op.startTime.map(new Date(_)),
+              endTime = op.endTime.map(new Date(_)),
+              duration = op.duration,
+              failureReason = op.failureReason,
+              jobIds = jobIds
+            )
+          }
+        case None => throw new NotFoundException("unknown batch: " + batchId)
+      }
+    }.toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
new file mode 100644
index 0000000..5a276a9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.streaming.AllReceiversResource._
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def receiversList(): Seq[ReceiverInfo] = {
+    receiverInfoList(listener).sortBy(_.streamId)
+  }
+}
+
+private[v1] object AllReceiversResource {
+
+  def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = {
+    listener.synchronized {
+      listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) =>
+
+        val receiverInfo = listener.receiverInfo(streamId)
+        val streamName = receiverInfo.map(_.name)
+          .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
+        val avgEventRate =
+          if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size)
+
+        val (errorTime, errorMessage, error) = receiverInfo match {
+          case None => (None, None, None)
+          case Some(info) =>
+            val someTime =
+              if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None
+            val someMessage =
+              if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None
+            val someError =
+              if (info.lastError.length > 0) Some(info.lastError) else None
+
+            (someTime, someMessage, someError)
+        }
+
+        new ReceiverInfo(
+          streamId = streamId,
+          streamName = streamName,
+          isActive = receiverInfo.map(_.active),
+          executorId = receiverInfo.map(_.executorId),
+          executorHost = receiverInfo.map(_.location),
+          lastErrorTime = errorTime,
+          lastErrorMessage = errorMessage,
+          lastError = error,
+          avgEventRate = avgEventRate,
+          eventRates = eventRates
+        )
+      }.toSeq
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
new file mode 100644
index 0000000..e64830a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{Path, PathParam}
+
+import org.apache.spark.status.api.v1.UIRootFromServletContext
+
+@Path("/v1")
+private[v1] class ApiStreamingApp extends UIRootFromServletContext {
+
+  @Path("applications/{appId}/streaming")
+  def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new ApiStreamingRootResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/streaming")
+  def getStreamingRoot(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new ApiStreamingRootResource(ui)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
new file mode 100644
index 0000000..1ccd586
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.Path
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+import org.apache.spark.ui.SparkUI
+
+private[v1] class ApiStreamingRootResource(ui: SparkUI) {
+
+  import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._
+
+  @Path("statistics")
+  def getStreamingStatistics(): StreamingStatisticsResource = {
+    new StreamingStatisticsResource(getListener(ui))
+  }
+
+  @Path("receivers")
+  def getReceivers(): AllReceiversResource = {
+    new AllReceiversResource(getListener(ui))
+  }
+
+  @Path("receivers/{streamId: \\d+}")
+  def getReceiver(): OneReceiverResource = {
+    new OneReceiverResource(getListener(ui))
+  }
+
+  @Path("batches")
+  def getBatches(): AllBatchesResource = {
+    new AllBatchesResource(getListener(ui))
+  }
+
+  @Path("batches/{batchId: \\d+}")
+  def getBatch(): OneBatchResource = {
+    new OneBatchResource(getListener(ui))
+  }
+
+  @Path("batches/{batchId: \\d+}/operations")
+  def getOutputOperations(): AllOutputOperationsResource = {
+    new AllOutputOperationsResource(getListener(ui))
+  }
+
+  @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}")
+  def getOutputOperation(): OneOutputOperationResource = {
+    new OneOutputOperationResource(getListener(ui))
+  }
+
+}
+
+private[v1] object ApiStreamingRootResource {
+  def getListener(ui: SparkUI): StreamingJobProgressListener = {
+    ui.getStreamingJobProgressListener match {
+      case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener]
+      case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
new file mode 100644
index 0000000..d3c689c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneBatchResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = {
+    val someBatch = AllBatchesResource.batchInfoList(listener)
+      .find { _.batchId == batchId }
+    someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
new file mode 100644
index 0000000..aabcdb2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def oneOperation(
+      @PathParam("batchId") batchId: Long,
+      @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = {
+
+    val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId)
+      .find { _.outputOpId == opId }
+    someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
new file mode 100644
index 0000000..c0cc99d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = {
+    val someReceiver = AllReceiversResource.receiverInfoList(listener)
+      .find { _.streamId == streamId }
+    someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
new file mode 100644
index 0000000..6cff87b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) {
+
+  @GET
+  def streamingStatistics(): StreamingStatistics = {
+    listener.synchronized {
+      val batches = listener.retainedBatches
+      val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration))
+      val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay))
+      val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay))
+      val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay))
+
+      new StreamingStatistics(
+        startTime = new Date(listener.startTime),
+        batchDuration = listener.batchDuration,
+        numReceivers = listener.numReceivers,
+        numActiveReceivers = listener.numActiveReceivers,
+        numInactiveReceivers = listener.numInactiveReceivers,
+        numTotalCompletedBatches = listener.numTotalCompletedBatches,
+        numRetainedCompletedBatches = listener.retainedCompletedBatches.size,
+        numActiveBatches = listener.numUnprocessedBatches,
+        numProcessedRecords = listener.numTotalProcessedRecords,
+        numReceivedRecords = listener.numTotalReceivedRecords,
+        avgInputRate = avgInputRate,
+        avgSchedulingDelay = avgSchedulingDelay,
+        avgProcessingTime = avgProcessingTime,
+        avgTotalDelay = avgTotalDelay
+      )
+    }
+  }
+
+  private def avgRate(data: Seq[Double]): Option[Double] = {
+    if (data.isEmpty) None else Some(data.sum / data.size)
+  }
+
+  private def avgTime(data: Seq[Long]): Option[Long] = {
+    if (data.isEmpty) None else Some(data.sum / data.size)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala
new file mode 100644
index 0000000..403b0eb
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+class StreamingStatistics private[spark](
+    val startTime: Date,
+    val batchDuration: Long,
+    val numReceivers: Int,
+    val numActiveReceivers: Int,
+    val numInactiveReceivers: Int,
+    val numTotalCompletedBatches: Long,
+    val numRetainedCompletedBatches: Long,
+    val numActiveBatches: Long,
+    val numProcessedRecords: Long,
+    val numReceivedRecords: Long,
+    val avgInputRate: Option[Double],
+    val avgSchedulingDelay: Option[Long],
+    val avgProcessingTime: Option[Long],
+    val avgTotalDelay: Option[Long])
+
+class ReceiverInfo private[spark](
+    val streamId: Int,
+    val streamName: String,
+    val isActive: Option[Boolean],
+    val executorId: Option[String],
+    val executorHost: Option[String],
+    val lastErrorTime: Option[Date],
+    val lastErrorMessage: Option[String],
+    val lastError: Option[String],
+    val avgEventRate: Option[Double],
+    val eventRates: Seq[(Long, Double)])
+
+class BatchInfo private[spark](
+    val batchId: Long,
+    val batchTime: Date,
+    val status: String,
+    val batchDuration: Long,
+    val inputSize: Long,
+    val schedulingDelay: Option[Long],
+    val processingTime: Option[Long],
+    val totalDelay: Option[Long],
+    val numActiveOutputOps: Int,
+    val numCompletedOutputOps: Int,
+    val numFailedOutputOps: Int,
+    val numTotalOutputOps: Int,
+    val firstFailureReason: Option[String])
+
+class OutputOperationInfo private[spark](
+    val outputOpId: OutputOpId,
+    val name: String,
+    val description: String,
+    val startTime: Option[Date],
+    val endTime: Option[Date],
+    val duration: Option[Long],
+    val failureReason: Option[String],
+    val jobIds: Seq[SparkJobId])

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 444261d..0a4c141 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -45,7 +45,8 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
+import org.apache.spark.streaming.scheduler.
+    {ExecutorAllocationManager, JobScheduler, StreamingListener, StreamingListenerStreamingStarted}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
 import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
 
@@ -583,6 +584,8 @@ class StreamingContext private[streaming] (
               scheduler.start()
             }
             state = StreamingContextState.ACTIVE
+            scheduler.listenerBus.post(
+              StreamingListenerStreamingStarted(System.currentTimeMillis()))
           } catch {
             case NonFatal(e) =>
               logError("Error starting the context, marking it as stopped", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index db0bae9..28cb86c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -21,6 +21,9 @@ import org.apache.spark.streaming.Time
 
 private[streaming] trait PythonStreamingListener{
 
+  /** Called when the streaming has been started */
+  def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted) { }
+
   /** Called when a receiver has been started */
   def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
 
@@ -51,6 +54,11 @@ private[streaming] trait PythonStreamingListener{
 private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
   extends JavaStreamingListener {
 
+  /** Called when the streaming has been started */
+  override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+    listener.onStreamingStarted(streamingStarted)
+  }
+
   /** Called when a receiver has been started */
   override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
     listener.onReceiverStarted(receiverStarted)
@@ -99,6 +107,9 @@ private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamin
  */
 private[streaming] class JavaStreamingListener {
 
+  /** Called when the streaming has been started */
+  def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { }
+
   /** Called when a receiver has been started */
   def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { }
 
@@ -131,6 +142,9 @@ private[streaming] class JavaStreamingListener {
  */
 private[streaming] sealed trait JavaStreamingListenerEvent
 
+private[streaming] class JavaStreamingListenerStreamingStarted(val time: Long)
+  extends JavaStreamingListenerEvent
+
 private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo)
   extends JavaStreamingListenerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index b109b9f..ee8370d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -77,6 +77,11 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav
     )
   }
 
+  override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {
+    javaStreamingListener.onStreamingStarted(
+      new JavaStreamingListenerStreamingStarted(streamingStarted.time))
+  }
+
   override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
     javaStreamingListener.onReceiverStarted(
       new JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo)))

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 58fc78d..b57f9b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -30,6 +30,9 @@ import org.apache.spark.util.Distribution
 sealed trait StreamingListenerEvent
 
 @DeveloperApi
+case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent
+
+@DeveloperApi
 case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
 @DeveloperApi
@@ -66,6 +69,9 @@ case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
 @DeveloperApi
 trait StreamingListener {
 
+  /** Called when the streaming has been started */
+  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
+
   /** Called when a receiver has been started */
   def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 39f6e71..5fb0bd0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -65,6 +65,8 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
         listener.onOutputOperationStarted(outputOperationStarted)
       case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
         listener.onOutputOperationCompleted(outputOperationCompleted)
+      case streamingStarted: StreamingListenerStreamingStarted =>
+        listener.onStreamingStarted(streamingStarted)
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 61f852a..95f5821 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -27,7 +27,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.scheduler._
 
-private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
+private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
   extends SparkListener with StreamingListener {
 
   private val waitingBatchUIData = new HashMap[Time, BatchUIData]
@@ -39,6 +39,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   private var totalProcessedRecords = 0L
   private val receiverInfos = new HashMap[Int, ReceiverInfo]
 
+  private var _startTime = -1L
+
   // Because onJobStart and onBatchXXX messages are processed in different threads,
   // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
   // cannot use a map of (Time, BatchUIData).
@@ -66,6 +68,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   val batchDuration = ssc.graph.batchDuration.milliseconds
 
+  override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) {
+    _startTime = streamingStarted.time
+  }
+
   override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
     synchronized {
       receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo
@@ -152,6 +158,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     }
   }
 
+  def startTime: Long = _startTime
+
   def numReceivers: Int = synchronized {
     receiverInfos.size
   }
@@ -267,7 +275,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 }
 
-private[streaming] object StreamingJobProgressListener {
+private[spark] object StreamingJobProgressListener {
   type SparkJobId = Int
   type OutputOpId = Int
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 46cd309..7abafd6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -143,7 +143,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
   import StreamingPage._
 
   private val listener = parent.listener
-  private val startTime = System.currentTimeMillis()
+
+  private def startTime: Long = listener.startTime
 
   /** Render the page */
   def render(request: HttpServletRequest): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index c5f8aad..9d1b82a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -38,6 +38,7 @@ private[spark] class StreamingTab(val ssc: StreamingContext)
 
   ssc.addStreamingListener(listener)
   ssc.sc.addSparkListener(listener)
+  parent.setStreamingJobProgressListener(listener)
   attachPage(new StreamingPage(this))
   attachPage(new BatchPage(this))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index ff0be82..63fd6c4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*;
 public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
 
   @Override
+  public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) {
+    super.onStreamingStarted(streamingStarted);
+  }
+
+  @Override
   public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
     JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
     receiverInfo.streamId();

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 0295e05..cfd4323 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
     val listener = new TestJavaStreamingListener()
     val listenerWrapper = new JavaStreamingListenerWrapper(listener)
 
+    val streamingStarted = StreamingListenerStreamingStarted(1000L)
+    listenerWrapper.onStreamingStarted(streamingStarted)
+    assert(listener.streamingStarted.time === streamingStarted.time)
+
     val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
       streamId = 2,
       name = "test",
@@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
 
 class TestJavaStreamingListener extends JavaStreamingListener {
 
+  var streamingStarted: JavaStreamingListenerStreamingStarted = null
   var receiverStarted: JavaStreamingListenerReceiverStarted = null
   var receiverError: JavaStreamingListenerReceiverError = null
   var receiverStopped: JavaStreamingListenerReceiverStopped = null
@@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener {
   var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
   var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
 
+  override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+    this.streamingStarted = streamingStarted
+  }
+
   override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
     this.receiverStarted = receiverStarted
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce99f51d/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 46ab3ac..56b4008 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -62,6 +62,10 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
       0 -> StreamInputInfo(0, 300L),
       1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
 
+    // onStreamingStarted
+    listener.onStreamingStarted(StreamingListenerStreamingStarted(100L))
+    listener.startTime should be (100)
+
     // onBatchSubmitted
     val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
     listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org