You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/07/09 19:45:51 UTC

git commit: [STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false

Repository: spark
Updated Branches:
  refs/heads/master 339441f54 -> 0eb11527d


[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false

Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled.

Author: Manuel Laflamme <ma...@gmail.com>

Closes #1285 from mlaflamm/spark-2343 and squashes the following commits:

61c9e38 [Manuel Laflamme] Unit tests for queue input stream
c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eb11527
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eb11527
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eb11527

Branch: refs/heads/master
Commit: 0eb11527d13083ced215e3fda44ed849198a57cb
Parents: 339441f
Author: Manuel Laflamme <ma...@gmail.com>
Authored: Wed Jul 9 10:45:45 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 9 10:45:45 2014 -0700

----------------------------------------------------------------------
 .../streaming/dstream/QueueInputDStream.scala   |  2 +-
 .../spark/streaming/InputStreamsSuite.scala     | 92 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0eb11527/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 6376cff..ed7da6d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
     if (oneAtATime && queue.size > 0) {
       buffer += queue.dequeue()
     } else {
-      buffer ++= queue
+      buffer ++= queue.dequeueAll(_ => true)
     }
     if (buffer.size > 0) {
       if (oneAtATime) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0eb11527/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index cd0aa4d..cc4a650 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
 import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
 
 import com.google.common.io.Files
 import org.scalatest.BeforeAndAfter
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.util.Utils
 import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.rdd.RDD
 
 class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
@@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     logInfo("--------------------------------")
     assert(output.sum === numTotalRecords)
   }
+
+  test("queue input stream - oneAtATime=true") {
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val queue = new SynchronizedQueue[RDD[String]]()
+    val queueStream = ssc.queueStream(queue, oneAtATime = true)
+    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+    val outputStream = new TestOutputStream(queueStream, outputBuffer)
+    def output = outputBuffer.filter(_.size > 0)
+    outputStream.register()
+    ssc.start()
+
+    // Setup data queued into the stream
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    val input = Seq("1", "2", "3", "4", "5")
+    val expectedOutput = input.map(Seq(_))
+    //Thread.sleep(1000)
+    val inputIterator = input.toIterator
+    for (i <- 0 until input.size) {
+      // Enqueue more than 1 item per tick but they should dequeue one at a time
+      inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+      clock.addToTime(batchDuration.milliseconds)
+    }
+    Thread.sleep(1000)
+    logInfo("Stopping context")
+    ssc.stop()
+
+    // Verify whether data received was as expected
+    logInfo("--------------------------------")
+    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output")
+    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("expected output.size = " + expectedOutput.size)
+    logInfo("expected output")
+    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("--------------------------------")
+
+    // Verify whether all the elements received are as expected
+    assert(output.size === expectedOutput.size)
+    for (i <- 0 until output.size) {
+      assert(output(i) === expectedOutput(i))
+    }
+  }
+
+  test("queue input stream - oneAtATime=false") {
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val queue = new SynchronizedQueue[RDD[String]]()
+    val queueStream = ssc.queueStream(queue, oneAtATime = false)
+    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+    val outputStream = new TestOutputStream(queueStream, outputBuffer)
+    def output = outputBuffer.filter(_.size > 0)
+    outputStream.register()
+    ssc.start()
+
+    // Setup data queued into the stream
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    val input = Seq("1", "2", "3", "4", "5")
+    val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
+
+    // Enqueue the first 3 items (one by one), they should be merged in the next batch
+    val inputIterator = input.toIterator
+    inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+    clock.addToTime(batchDuration.milliseconds)
+    Thread.sleep(1000)
+
+    // Enqueue the remaining items (again one by one), merged in the final batch
+    inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+    clock.addToTime(batchDuration.milliseconds)
+    Thread.sleep(1000)
+    logInfo("Stopping context")
+    ssc.stop()
+
+    // Verify whether data received was as expected
+    logInfo("--------------------------------")
+    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output")
+    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("expected output.size = " + expectedOutput.size)
+    logInfo("expected output")
+    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("--------------------------------")
+
+    // Verify whether all the elements received are as expected
+    assert(output.size === expectedOutput.size)
+    for (i <- 0 until output.size) {
+      assert(output(i) === expectedOutput(i))
+    }
+  }
 }