You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/06 04:23:46 UTC

spark git commit: [SPARK-10900] [STREAMING] Add output operation events to StreamingListener

Repository: spark
Updated Branches:
  refs/heads/master a609eb20d -> be7c5ff1a


[SPARK-10900] [STREAMING] Add output operation events to StreamingListener

Add output operation events to StreamingListener so as to implement the following UI features:

1. Progress bar of a batch in the batch list.
2. Be able to display output operation `description` and `duration` when there is no spark job in a Streaming job.

Author: zsxwing <zs...@gmail.com>

Closes #8958 from zsxwing/output-operation-events.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be7c5ff1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be7c5ff1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be7c5ff1

Branch: refs/heads/master
Commit: be7c5ff1ad02ce1c03113c98656a4e0c0c3cee83
Parents: a609eb2
Author: zsxwing <zs...@gmail.com>
Authored: Mon Oct 5 19:23:41 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Oct 5 19:23:41 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/DStreamGraph.scala   |  6 ++-
 .../apache/spark/streaming/scheduler/Job.scala  |  7 ++++
 .../streaming/scheduler/JobScheduler.scala      | 20 +++++----
 .../scheduler/OutputOperationInfo.scala         | 44 ++++++++++++++++++++
 .../streaming/scheduler/StreamingListener.scala | 16 +++++++
 .../scheduler/StreamingListenerBus.scala        |  4 ++
 .../streaming/StreamingListenerSuite.scala      | 37 ++++++++++++++++
 7 files changed, 125 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index ebbcb6b..de79c9e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -111,7 +111,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def generateJobs(time: Time): Seq[Job] = {
     logDebug("Generating jobs for time " + time)
     val jobs = this.synchronized {
-      outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+      outputStreams.flatMap { outputStream =>
+        val jobOption = outputStream.generateJob(time)
+        jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm))
+        jobOption
+      }
     }
     logDebug("Generated " + jobs.length + " jobs for time " + time)
     jobs

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 3c481bf..1373053 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -29,6 +29,7 @@ class Job(val time: Time, func: () => _) {
   private var _outputOpId: Int = _
   private var isSet = false
   private var _result: Try[_] = null
+  private var _callSite: String = "Unknown"
 
   def run() {
     _result = Try(func())
@@ -70,5 +71,11 @@ class Job(val time: Time, func: () => _) {
     _outputOpId = outputOpId
   }
 
+  def setCallSite(callSite: String): Unit = {
+    _callSite = callSite
+  }
+
+  def callSite: String = _callSite
+
   override def toString: String = id
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 66afbf1..0a4a396 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -30,8 +30,8 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
 
 
 private[scheduler] sealed trait JobSchedulerEvent
-private[scheduler] case class JobStarted(job: Job) extends JobSchedulerEvent
-private[scheduler] case class JobCompleted(job: Job) extends JobSchedulerEvent
+private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent
+private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent
 private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent
 
 /**
@@ -143,8 +143,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   private def processEvent(event: JobSchedulerEvent) {
     try {
       event match {
-        case JobStarted(job) => handleJobStart(job)
-        case JobCompleted(job) => handleJobCompletion(job)
+        case JobStarted(job, startTime) => handleJobStart(job, startTime)
+        case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
         case ErrorReported(m, e) => handleError(m, e)
       }
     } catch {
@@ -153,7 +153,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     }
   }
 
-  private def handleJobStart(job: Job) {
+  private def handleJobStart(job: Job, startTime: Long) {
     val jobSet = jobSets.get(job.time)
     val isFirstJobOfJobSet = !jobSet.hasStarted
     jobSet.handleJobStart(job)
@@ -162,12 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       // correct "jobSet.processingStartTime".
       listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
     }
+    listenerBus.post(StreamingListenerOutputOperationStarted(
+      OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None)))
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
   }
 
-  private def handleJobCompletion(job: Job) {
+  private def handleJobCompletion(job: Job, completedTime: Long) {
     val jobSet = jobSets.get(job.time)
     jobSet.handleJobCompletion(job)
+    listenerBus.post(StreamingListenerOutputOperationCompleted(
+      OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime))))
     logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
     if (jobSet.hasCompleted) {
       jobSets.remove(jobSet.time)
@@ -210,7 +214,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
         // it's possible that when `post` is called, `eventLoop` happens to null.
         var _eventLoop = eventLoop
         if (_eventLoop != null) {
-          _eventLoop.post(JobStarted(job))
+          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
           // Disable checks for existing output directories in jobs launched by the streaming
           // scheduler, since we may need to write output to an existing directory during checkpoint
           // recovery; see SPARK-4835 for more details.
@@ -219,7 +223,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
           }
           _eventLoop = eventLoop
           if (_eventLoop != null) {
-            _eventLoop.post(JobCompleted(job))
+            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
           }
         } else {
           // JobScheduler has been stopped.

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
new file mode 100644
index 0000000..d5614b3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.streaming.Time
+
+/**
+ * :: DeveloperApi ::
+ * Class having information on output operations.
+ * @param batchTime Time of the batch
+ * @param id Id of this output operation. Different output operations have different ids in a batch.
+ * @param description The description of this output operation.
+ * @param startTime Clock time of when the output operation started processing
+ * @param endTime Clock time of when the output operation started processing
+ */
+@DeveloperApi
+case class OutputOperationInfo(
+    batchTime: Time,
+    id: Int,
+    description: String,
+    startTime: Option[Long],
+    endTime: Option[Long]) {
+
+  /**
+   * Return the duration of this output operation.
+   */
+  def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 74dbba4..d19bdbb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -39,6 +39,14 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
 @DeveloperApi
+case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo)
+  extends StreamingListenerEvent
+
+@DeveloperApi
+case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo)
+  extends StreamingListenerEvent
+
+@DeveloperApi
 case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
 
@@ -75,6 +83,14 @@ trait StreamingListener {
 
   /** Called when processing of a batch of jobs has completed. */
   def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
+
+  /** Called when processing of a job of a batch has started. */
+  def onOutputOperationStarted(
+      outputOperationStarted: StreamingListenerOutputOperationStarted) { }
+
+  /** Called when processing of a job of a batch has completed. */
+  def onOutputOperationCompleted(
+      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index b07d6cf..ca111bb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -43,6 +43,10 @@ private[spark] class StreamingListenerBus
         listener.onBatchStarted(batchStarted)
       case batchCompleted: StreamingListenerBatchCompleted =>
         listener.onBatchCompleted(batchCompleted)
+      case outputOperationStarted: StreamingListenerOutputOperationStarted =>
+        listener.onOutputOperationStarted(outputOperationStarted)
+      case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
+        listener.onOutputOperationCompleted(outputOperationCompleted)
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/be7c5ff1/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index d8fd2ce..2b43b74 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -140,6 +140,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     }
   }
 
+  test("output operation reporting") {
+    ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD(_.count())
+    inputStream.foreachRDD(_.collect())
+    inputStream.foreachRDD(_.count())
+
+    val collector = new OutputOperationInfoCollector
+    ssc.addStreamingListener(collector)
+
+    ssc.start()
+    try {
+      eventually(timeout(30 seconds), interval(20 millis)) {
+        collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+        collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+      }
+    } finally {
+      ssc.stop()
+    }
+  }
+
   test("onBatchCompleted with successful batch") {
     ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
     val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
@@ -254,6 +275,22 @@ class ReceiverInfoCollector extends StreamingListener {
   }
 }
 
+/** Listener that collects information on processed output operations */
+class OutputOperationInfoCollector extends StreamingListener {
+  val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+  val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+
+  override def onOutputOperationStarted(
+      outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
+    startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id
+  }
+
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
+    completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id
+  }
+}
+
 class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
   def onStart() {
     Future {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org