You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/29 04:32:05 UTC

spark git commit: [SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback

Repository: spark
Updated Branches:
  refs/heads/master d36e67350 -> 5c8f4bd5f


[SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback

This is to ensure that receivers that receive data in small batches (like Kinesis) and want to add them but want the callback function to be called only once. This is for internal use only for improvement to Kinesis Receiver that we are planning to do.

Author: Tathagata Das <ta...@gmail.com>

Closes #5695 from tdas/SPARK-7138 and squashes the following commits:

a35cf7d [Tathagata Das] Fixed style.
a7a4cb9 [Tathagata Das] Added extra method to BlockGenerator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c8f4bd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c8f4bd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c8f4bd5

Branch: refs/heads/master
Commit: 5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0
Parents: d36e673
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Apr 28 19:31:57 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Apr 28 19:31:57 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/receiver/BlockGenerator.scala     | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c8f4bd5/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index f4963a7..4bebcc5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -126,6 +126,20 @@ private[streaming] class BlockGenerator(
     listener.onAddData(data, metadata)
   }
 
+  /**
+   * Push multiple data items into the buffer. After buffering the data, the
+   * `BlockGeneratorListener.onAddData` callback will be called. All received data items
+   * will be periodically pushed into BlockManager. Note that all the data items is guaranteed
+   * to be present in a single block.
+   */
+  def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
+    dataIterator.foreach { data =>
+      waitToPush()
+      currentBuffer += data
+    }
+    listener.onAddData(dataIterator, metadata)
+  }
+
   /** Change the buffer to which single records are added to. */
   private def updateCurrentBuffer(time: Long): Unit = synchronized {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org