You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/07 03:07:04 UTC

spark git commit: [SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn't report InputInfo

Repository: spark
Updated Branches:
  refs/heads/master 71a452b61 -> 14502d5e5


[SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn't report InputInfo

The bug is because SPARK-7139 removed some codes from SPARK-7112 unintentionally here: https://github.com/apache/spark/commit/1854ac326a9cc6014817d8df30ed0458eee5d7d1#diff-5c8651dd78abd20439b8eb938175075dL72

This PR just added them back and added some assertions in the tests to verify it.

Author: zsxwing <zs...@gmail.com>

Closes #5950 from zsxwing/SPARK-7405 and squashes the following commits:

675f5d9 [zsxwing] Fix the bug that ReceiverInputDStream doesn't report InputInfo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14502d5e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14502d5e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14502d5e

Branch: refs/heads/master
Commit: 14502d5e569f8460d3e2c973a4277441307433cc
Parents: 71a452b
Author: zsxwing <zs...@gmail.com>
Authored: Wed May 6 18:07:00 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed May 6 18:07:00 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/dstream/ReceiverInputDStream.scala | 5 +++++
 .../scala/org/apache/spark/streaming/InputStreamsSuite.scala  | 7 +++++++
 2 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14502d5e/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 15d9710..5cfe43a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.InputInfo
 import org.apache.spark.streaming.util.WriteAheadLogUtils
 
 /**
@@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
         val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
         val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 
+        // Register the input blocks information into InputInfoTracker
+        val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
+        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
         // Are WAL record handles present with all the blocks
         val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14502d5e/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6074532..93e6b0c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -50,6 +50,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
       // Set up the streaming context and input streams
       withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        ssc.addStreamingListener(ssc.progressListener)
+
         val input = Seq(1, 2, 3, 4, 5)
         // Use "batchCount" to make sure we check the result after all batches finish
         val batchCounter = new BatchCounter(ssc)
@@ -72,6 +74,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
           fail("Timeout: cannot finish all batches in 30 seconds")
         }
+
+        // Verify all "InputInfo"s have been reported
+        assert(ssc.progressListener.numTotalReceivedRecords === input.size)
+        assert(ssc.progressListener.numTotalProcessedRecords === input.size)
+
         logInfo("Stopping server")
         testServer.stop()
         logInfo("Stopping context")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org