You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/15 01:00:40 UTC

spark git commit: [SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted (backport to branch 1.3)

Repository: spark
Updated Branches:
  refs/heads/branch-1.3 db2154d7d -> 1ab423f6e


[SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted (backport to branch 1.3)

Backport SPARK-6766 #5414 to branch 1.3

Conflicts:

	streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Author: zsxwing <zs...@gmail.com>

Closes #5452 from zsxwing/SPARK-6766-branch-1.3 and squashes the following commits:

cb87e44 [zsxwing] [SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted (backport to branch 1.3)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ab423f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ab423f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ab423f6

Branch: refs/heads/branch-1.3
Commit: 1ab423f6e6a303bb84b426db4870d28ff53de254
Parents: db2154d
Author: zsxwing <zs...@gmail.com>
Authored: Tue Apr 14 16:00:30 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Apr 14 16:00:30 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      |   8 +-
 .../ui/StreamingJobProgressListener.scala       |  16 +--
 .../streaming/StreamingListenerSuite.scala      |  55 +++++++--
 .../ui/StreamingJobProgressListenerSuite.scala  | 119 +++++++++++++++++++
 4 files changed, 180 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ab423f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b3ffc71..5b0eb78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -105,6 +105,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     if (jobSet.jobs.isEmpty) {
       logInfo("No jobs added for time " + jobSet.time)
     } else {
+      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
       jobSets.put(jobSet.time, jobSet)
       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
       logInfo("Added jobs for time " + jobSet.time)
@@ -134,10 +135,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   private def handleJobStart(job: Job) {
     val jobSet = jobSets.get(job.time)
-    if (!jobSet.hasStarted) {
+    val isFirstJobOfJobSet = !jobSet.hasStarted
+    jobSet.handleJobStart(job)
+    if (isFirstJobOfJobSet) {
+      // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
+      // correct "jobSet.processingStartTime".
       listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
     }
-    jobSet.handleJobStart(job)
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1ab423f6/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 5ee53a5..49afeda 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -32,7 +32,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   private val waitingBatchInfos = new HashMap[Time, BatchInfo]
   private val runningBatchInfos = new HashMap[Time, BatchInfo]
-  private val completedaBatchInfos = new Queue[BatchInfo]
+  private val completedBatchInfos = new Queue[BatchInfo]
   private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
   private var totalCompletedBatches = 0L
   private var totalReceivedRecords = 0L
@@ -60,7 +60,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
-    runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+    waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
   }
 
   override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
@@ -75,8 +75,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
     waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
     runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-    completedaBatchInfos.enqueue(batchCompleted.batchInfo)
-    if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+    completedBatchInfos.enqueue(batchCompleted.batchInfo)
+    if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
     totalCompletedBatches += 1L
 
     batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
@@ -113,7 +113,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
-    completedaBatchInfos.toSeq
+    completedBatchInfos.toSeq
   }
 
   def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -160,7 +160,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def lastCompletedBatch: Option[BatchInfo] = {
-    completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+    completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
   }
 
   def lastReceivedBatch: Option[BatchInfo] = {
@@ -169,10 +169,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
 
   private def retainedBatches: Seq[BatchInfo] = synchronized {
     (waitingBatchInfos.values.toSeq ++
-      runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+      runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
   }
 
   private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
-    Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+    Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1ab423f6/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index f52562b..b055938 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -46,10 +46,38 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     val collector = new BatchInfoCollector
     ssc.addStreamingListener(collector)
     runStreams(ssc, input.size, input.size)
-    val batchInfos = collector.batchInfos
-    batchInfos should have size 4
 
-    batchInfos.foreach(info => {
+    // SPARK-6766: batch info should be submitted
+    val batchInfosSubmitted = collector.batchInfosSubmitted
+    batchInfosSubmitted should have size 4
+
+    batchInfosSubmitted.foreach(info => {
+      info.schedulingDelay should be (None)
+      info.processingDelay should be (None)
+      info.totalDelay should be (None)
+    })
+
+    isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
+
+    // SPARK-6766: processingStartTime of batch info should not be None when starting
+    val batchInfosStarted = collector.batchInfosStarted
+    batchInfosStarted should have size 4
+
+    batchInfosStarted.foreach(info => {
+      info.schedulingDelay should not be None
+      info.schedulingDelay.get should be >= 0L
+      info.processingDelay should be (None)
+      info.totalDelay should be (None)
+    })
+
+    isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
+
+    // test onBatchCompleted
+    val batchInfosCompleted = collector.batchInfosCompleted
+    batchInfosCompleted should have size 4
+
+    batchInfosCompleted.foreach(info => {
       info.schedulingDelay should not be None
       info.processingDelay should not be None
       info.totalDelay should not be None
@@ -58,9 +86,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
       info.totalDelay.get should be >= 0L
     })
 
-    isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
-    isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
-    isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
   }
 
   test("receiver info reporting") {
@@ -99,9 +127,20 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
 /** Listener that collects information on processed batches */
 class BatchInfoCollector extends StreamingListener {
-  val batchInfos = new ArrayBuffer[BatchInfo]
+  val batchInfosCompleted = new ArrayBuffer[BatchInfo]
+  val batchInfosStarted = new ArrayBuffer[BatchInfo]
+  val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
+
+  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
+    batchInfosSubmitted += batchSubmitted.batchInfo
+  }
+
+  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
+    batchInfosStarted += batchStarted.batchInfo
+  }
+
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
-    batchInfos += batchCompleted.batchInfo
+    batchInfosCompleted += batchCompleted.batchInfo
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1ab423f6/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
new file mode 100644
index 0000000..2b9d164
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.scalatest.Matchers
+
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase}
+
+class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
+
+  val input = (1 to 4).map(Seq(_)).toSeq
+  val operation = (d: DStream[Int]) => d.map(x => x)
+
+  override def batchDuration = Milliseconds(100)
+
+  test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
+    "onReceiverStarted, onReceiverError, onReceiverStopped") {
+    val ssc = setupStreams(input, operation)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    val receivedBlockInfo = Map(
+      0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+      1 -> Array(ReceivedBlockInfo(1, 300, null))
+    )
+
+    // onBatchSubmitted
+    val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
+    listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+    listener.waitingBatches should be (List(batchInfoSubmitted))
+    listener.runningBatches should be (Nil)
+    listener.retainedCompletedBatches should be (Nil)
+    listener.lastCompletedBatch should be (None)
+    listener.numUnprocessedBatches should be (1)
+    listener.numTotalCompletedBatches should be (0)
+    listener.numTotalProcessedRecords should be (0)
+    listener.numTotalReceivedRecords should be (0)
+
+    // onBatchStarted
+    val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+    listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+    listener.waitingBatches should be (Nil)
+    listener.runningBatches should be (List(batchInfoStarted))
+    listener.retainedCompletedBatches should be (Nil)
+    listener.lastCompletedBatch should be (None)
+    listener.numUnprocessedBatches should be (1)
+    listener.numTotalCompletedBatches should be (0)
+    listener.numTotalProcessedRecords should be (0)
+    listener.numTotalReceivedRecords should be (600)
+
+    // onBatchCompleted
+    val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+    listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    listener.waitingBatches should be (Nil)
+    listener.runningBatches should be (Nil)
+    listener.retainedCompletedBatches should be (List(batchInfoCompleted))
+    listener.lastCompletedBatch should be (Some(batchInfoCompleted))
+    listener.numUnprocessedBatches should be (0)
+    listener.numTotalCompletedBatches should be (1)
+    listener.numTotalProcessedRecords should be (600)
+    listener.numTotalReceivedRecords should be (600)
+
+    // onReceiverStarted
+    val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost")
+    listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (None)
+
+    // onReceiverError
+    val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost")
+    listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (Some(receiverInfoError))
+    listener.receiverInfo(2) should be (None)
+
+    // onReceiverStopped
+    val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost")
+    listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
+    listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+    listener.receiverInfo(1) should be (Some(receiverInfoError))
+    listener.receiverInfo(2) should be (Some(receiverInfoStopped))
+    listener.receiverInfo(3) should be (None)
+  }
+
+  test("Remove the old completed batches when exceeding the limit") {
+    val ssc = setupStreams(input, operation)
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    val receivedBlockInfo = Map(
+      0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+      1 -> Array(ReceivedBlockInfo(1, 300, null))
+    )
+    val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+
+    for(_ <- 0 until (limit + 10)) {
+      listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    }
+
+    listener.retainedCompletedBatches.size should be (limit)
+    listener.numTotalCompletedBatches should be(limit + 10)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org