You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/10 10:51:48 UTC

spark git commit: [SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted

Repository: spark
Updated Branches:
  refs/heads/master 3290d2d13 -> 18ca089be


[SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted

This PR includes:

1. Send `StreamingListenerBatchSubmitted` when `JobSet` is submitted
1. Fix `StreamingListenerBatchStarted.batchInfo.processingStartTime`
1. Fix a type: `completedaBatchInfos` -> `completedBatchInfos`

Author: zsxwing <zs...@gmail.com>

Closes #5414 from zsxwing/SPARK-6766 and squashes the following commits:

2f85060 [zsxwing] Update tests
ca0955b [zsxwing] Combine unit tests
79b4fed [zsxwing] Add StreamingJobProgressListenerSuite to test StreamingJobProgressListener
fc3a2a1 [zsxwing] Add unit tests for SPARK-6766
74aed99 [zsxwing] Refactor as per TD's suggestion
493f978 [zsxwing] Send StreamingListenerBatchSubmitted when JobSet is submitted; fix StreamingListenerBatchStarted.batchInfo.processingStartTime; fix a typo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18ca089b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18ca089b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18ca089b

Branch: refs/heads/master
Commit: 18ca089bed41ce3e87deeb14206317863518c12c
Parents: 3290d2d
Author: zsxwing <zs...@gmail.com>
Authored: Fri Apr 10 01:51:42 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Apr 10 01:51:42 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      |   8 +-
 .../ui/StreamingJobProgressListener.scala       |  16 +--
 .../streaming/StreamingListenerSuite.scala      |  55 +++++++--
 .../ui/StreamingJobProgressListenerSuite.scala  | 119 +++++++++++++++++++
 4 files changed, 180 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18ca089b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index d6a93ac..95f1857 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -105,6 +105,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     if (jobSet.jobs.isEmpty) {
       logInfo("No jobs added for time " + jobSet.time)
     } else {
+      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
       jobSets.put(jobSet.time, jobSet)
       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
       logInfo("Added jobs for time " + jobSet.time)
@@ -134,10 +135,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   private def handleJobStart(job: Job) {
     val jobSet = jobSets.get(job.time)
-    if (!jobSet.hasStarted) {
+    val isFirstJobOfJobSet = !jobSet.hasStarted
+    jobSet.handleJobStart(job)
+    if (isFirstJobOfJobSet) {
+      // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
+      // correct "jobSet.processingStartTime".
       listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
     }
-    jobSet.handleJobStart(job)
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/18ca089b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index e4bd067..84f80e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -33,7 +33,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   private val waitingBatchInfos = new HashMap[Time, BatchInfo]
   private val runningBatchInfos = new HashMap[Time, BatchInfo]
-  private val completedaBatchInfos = new Queue[BatchInfo]
+  private val completedBatchInfos = new Queue[BatchInfo]
   private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
   private var totalCompletedBatches = 0L
   private var totalReceivedRecords = 0L
@@ -62,7 +62,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
     synchronized {
-      runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+      waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
     }
   }
 
@@ -79,8 +79,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     synchronized {
       waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
       runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-      completedaBatchInfos.enqueue(batchCompleted.batchInfo)
-      if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+      completedBatchInfos.enqueue(batchCompleted.batchInfo)
+      if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
       totalCompletedBatches += 1L
 
       batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
@@ -118,7 +118,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
-    completedaBatchInfos.toSeq
+    completedBatchInfos.toSeq
   }
 
   def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -165,7 +165,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def lastCompletedBatch: Option[BatchInfo] = {
-    completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+    completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
   }
 
   def lastReceivedBatch: Option[BatchInfo] = {
@@ -174,10 +174,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   private def retainedBatches: Seq[BatchInfo] = synchronized {
     (waitingBatchInfos.values.toSeq ++
-      runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+      runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
   }
 
   private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
-    Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+    Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18ca089b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 852e8bb..7210439 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -46,10 +46,38 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     val collector = new BatchInfoCollector
     ssc.addStreamingListener(collector)
     runStreams(ssc, input.size, input.size)
-    val batchInfos = collector.batchInfos
-    batchInfos should have size 4
 
-    batchInfos.foreach(info => {
+    // SPARK-6766: batch info should be submitted
+    val batchInfosSubmitted = collector.batchInfosSubmitted
+    batchInfosSubmitted should have size 4
+
+    batchInfosSubmitted.foreach(info => {
+      info.schedulingDelay should be (None)
+      info.processingDelay should be (None)
+      info.totalDelay should be (None)
+    })
+
+    isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
+
+    // SPARK-6766: processingStartTime of batch info should not be None when starting
+    val batchInfosStarted = collector.batchInfosStarted
+    batchInfosStarted should have size 4
+
+    batchInfosStarted.foreach(info => {
+      info.schedulingDelay should not be None
+      info.schedulingDelay.get should be >= 0L
+      info.processingDelay should be (None)
+      info.totalDelay should be (None)
+    })
+
+    isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
+
+    // test onBatchCompleted
+    val batchInfosCompleted = collector.batchInfosCompleted
+    batchInfosCompleted should have size 4
+
+    batchInfosCompleted.foreach(info => {
       info.schedulingDelay should not be None
       info.processingDelay should not be None
       info.totalDelay should not be None
@@ -58,9 +86,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
       info.totalDelay.get should be >= 0L
     })
 
-    isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
-    isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
-    isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
   }
 
   test("receiver info reporting") {
@@ -99,9 +127,20 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
 /** Listener that collects information on processed batches */
 class BatchInfoCollector extends StreamingListener {
-  val batchInfos = new ArrayBuffer[BatchInfo]
+  val batchInfosCompleted = new ArrayBuffer[BatchInfo]
+  val batchInfosStarted = new ArrayBuffer[BatchInfo]
+  val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
+
+  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
+    batchInfosSubmitted += batchSubmitted.batchInfo
+  }
+
+  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
+    batchInfosStarted += batchStarted.batchInfo
+  }
+
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
-    batchInfos += batchCompleted.batchInfo
+    batchInfosCompleted += batchCompleted.batchInfo
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/18ca089b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
new file mode 100644
index 0000000..2b9d164
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.scalatest.Matchers
+
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase}
+
+class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
+
+  val input = (1 to 4).map(Seq(_)).toSeq
+  val operation = (d: DStream[Int]) => d.map(x => x)
+
+  override def batchDuration = Milliseconds(100)
+
+  test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
+    "onReceiverStarted, onReceiverError, onReceiverStopped") {
+    val ssc = setupStreams(input, operation)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    val receivedBlockInfo = Map(
+      0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+      1 -> Array(ReceivedBlockInfo(1, 300, null))
+    )
+
+    // onBatchSubmitted
+    val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
+    listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+    listener.waitingBatches should be (List(batchInfoSubmitted))
+    listener.runningBatches should be (Nil)
+    listener.retainedCompletedBatches should be (Nil)
+    listener.lastCompletedBatch should be (None)
+    listener.numUnprocessedBatches should be (1)
+    listener.numTotalCompletedBatches should be (0)
+    listener.numTotalProcessedRecords should be (0)
+    listener.numTotalReceivedRecords should be (0)
+
+    // onBatchStarted
+    val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+    listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+    listener.waitingBatches should be (Nil)
+    listener.runningBatches should be (List(batchInfoStarted))
+    listener.retainedCompletedBatches should be (Nil)
+    listener.lastCompletedBatch should be (None)
+    listener.numUnprocessedBatches should be (1)
+    listener.numTotalCompletedBatches should be (0)
+    listener.numTotalProcessedRecords should be (0)
+    listener.numTotalReceivedRecords should be (600)
+
+    // onBatchCompleted
+    val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+    listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    listener.waitingBatches should be (Nil)
+    listener.runningBatches should be (Nil)
+    listener.retainedCompletedBatches should be (List(batchInfoCompleted))
+    listener.lastCompletedBatch should be (Some(batchInfoCompleted))
+    listener.numUnprocessedBatches should be (0)
+    listener.numTotalCompletedBatches should be (1)
+    listener.numTotalProcessedRecords should be (600)
+    listener.numTotalReceivedRecords should be (600)
+
+    // onReceiverStarted
+    val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost")
+    listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (None)
+
+    // onReceiverError
+    val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost")
+    listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (Some(receiverInfoError))
+    listener.receiverInfo(2) should be (None)
+
+    // onReceiverStopped
+    val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost")
+    listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (Some(receiverInfoError))
+    listener.receiverInfo(2) should be (Some(receiverInfoStopped))
+    listener.receiverInfo(3) should be (None)
+  }
+
+  test("Remove the old completed batches when exceeding the limit") {
+    val ssc = setupStreams(input, operation)
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    val receivedBlockInfo = Map(
+      0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+      1 -> Array(ReceivedBlockInfo(1, 300, null))
+    )
+    val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+
+    for(_ <- 0 until (limit + 10)) {
+      listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    }
+
+    listener.retainedCompletedBatches.size should be (limit)
+    listener.numTotalCompletedBatches should be(limit + 10)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org