You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/09 12:23:33 UTC

spark git commit: [SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated

Repository: spark
Updated Branches:
  refs/heads/master e30121afa -> 68ed3632c


[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated

Replace SynchronizeQueue with synchronized access to a Queue

Author: Sean Owen <so...@cloudera.com>

Closes #11111 from srowen/SPARK-13170.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ed3632
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ed3632
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ed3632

Branch: refs/heads/master
Commit: 68ed3632c56389ab3ff4ea5d73c575f224dab4f6
Parents: e30121a
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Feb 9 11:23:29 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 9 11:23:29 2016 +0000

----------------------------------------------------------------------
 .../spark/examples/streaming/QueueStream.scala  |  8 +++--
 .../spark/streaming/StreamingContext.scala      |  4 +--
 .../streaming/dstream/QueueInputDStream.scala   | 13 ++++---
 .../spark/streaming/InputStreamsSuite.scala     | 37 +++++++++++++-------
 4 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
index 13ba9a4..5455aed 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.examples.streaming
 
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable.Queue
 
 import org.apache.spark.SparkConf
 import org.apache.spark.rdd.RDD
@@ -34,7 +34,7 @@ object QueueStream {
 
     // Create the queue through which RDDs can be pushed to
     // a QueueInputDStream
-    val rddQueue = new SynchronizedQueue[RDD[Int]]()
+    val rddQueue = new Queue[RDD[Int]]()
 
     // Create the QueueInputDStream and use it do some processing
     val inputStream = ssc.queueStream(rddQueue)
@@ -45,7 +45,9 @@ object QueueStream {
 
     // Create and push some RDDs into
     for (i <- 1 to 30) {
-      rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+      rddQueue.synchronized {
+        rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+      }
       Thread.sleep(1000)
     }
     ssc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 32bea88..a1b25c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -459,7 +459,7 @@ class StreamingContext private[streaming] (
    * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
    * those RDDs, so `queueStream` doesn't support checkpointing.
    *
-   * @param queue      Queue of RDDs
+   * @param queue      Queue of RDDs. Modifications to this data structure must be synchronized.
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
    * @tparam T         Type of objects in the RDD
    */
@@ -477,7 +477,7 @@ class StreamingContext private[streaming] (
    * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
    * those RDDs, so `queueStream` doesn't support checkpointing.
    *
-   * @param queue      Queue of RDDs
+   * @param queue      Queue of RDDs. Modifications to this data structure must be synchronized.
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
    * @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
    *                   Set as null if no RDD should be returned when empty

http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a8d108d..f9c7869 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag](
 
   override def compute(validTime: Time): Option[RDD[T]] = {
     val buffer = new ArrayBuffer[RDD[T]]()
-    if (oneAtATime && queue.size > 0) {
-      buffer += queue.dequeue()
-    } else {
-      buffer ++= queue.dequeueAll(_ => true)
+    queue.synchronized {
+      if (oneAtATime && queue.nonEmpty) {
+        buffer += queue.dequeue()
+      } else {
+        buffer ++= queue
+        queue.clear()
+      }
     }
-    if (buffer.size > 0) {
+    if (buffer.nonEmpty) {
       if (oneAtATime) {
         Some(buffer.head)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 93c8833..fa17b3a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable
 import scala.language.postfixOps
 
 import com.google.common.io.Files
@@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
 import org.apache.spark.util.{ManualClock, Utils}
 
 class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         // Feed data to the server to send to the network receiver
         val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
         val expectedOutput = input.map(_.toString)
-        for (i <- 0 until input.size) {
+        for (i <- input.indices) {
           testServer.send(input(i).toString + "\n")
           Thread.sleep(500)
           clock.advance(batchDuration.milliseconds)
@@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         // Verify whether all the elements received are as expected
         // (whether the elements were received one in each interval is not verified)
         val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-        assert(output.size === expectedOutput.size)
-        for (i <- 0 until output.size) {
+        assert(output.length === expectedOutput.size)
+        for (i <- output.indices) {
           assert(output(i) === expectedOutput(i))
         }
       }
@@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = input.map(Seq(_))
     val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
-    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
 
     // Set up the streaming context and input streams
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
-      val queue = new SynchronizedQueue[RDD[String]]()
+      val queue = new mutable.Queue[RDD[String]]()
       val queueStream = ssc.queueStream(queue, oneAtATime = true)
       val outputStream = new TestOutputStream(queueStream, outputQueue)
       outputStream.register()
@@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
 
       val inputIterator = input.toIterator
-      for (i <- 0 until input.size) {
+      for (i <- input.indices) {
         // Enqueue more than 1 item per tick but they should dequeue one at a time
-        inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+        inputIterator.take(2).foreach { i =>
+          queue.synchronized {
+            queue += ssc.sparkContext.makeRDD(Seq(i))
+          }
+        }
         clock.advance(batchDuration.milliseconds)
       }
       Thread.sleep(1000)
@@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
   test("queue input stream - oneAtATime = false") {
     val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
-    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
 
     // Set up the streaming context and input streams
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
-      val queue = new SynchronizedQueue[RDD[String]]()
+      val queue = new mutable.Queue[RDD[String]]()
       val queueStream = ssc.queueStream(queue, oneAtATime = false)
       val outputStream = new TestOutputStream(queueStream, outputQueue)
       outputStream.register()
@@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
       // Enqueue the first 3 items (one by one), they should be merged in the next batch
       val inputIterator = input.toIterator
-      inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+      inputIterator.take(3).foreach { i =>
+        queue.synchronized {
+          queue += ssc.sparkContext.makeRDD(Seq(i))
+        }
+      }
       clock.advance(batchDuration.milliseconds)
       Thread.sleep(1000)
 
       // Enqueue the remaining items (again one by one), merged in the final batch
-      inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+      inputIterator.foreach { i =>
+        queue.synchronized {
+          queue += ssc.sparkContext.makeRDD(Seq(i))
+        }
+      }
       clock.advance(batchDuration.milliseconds)
       Thread.sleep(1000)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org