You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/09 09:44:59 UTC

spark git commit: [SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming

Repository: spark
Updated Branches:
  refs/heads/master f9307d8fc -> 159198eff


[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming

Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it.

Some notes about how behaviour is different for reviewers:
The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates.

Author: Holden Karau <ho...@us.ibm.com>
Author: tedyu <yu...@gmail.com>

Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/159198ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/159198ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/159198ef

Branch: refs/heads/master
Commit: 159198eff67ee9ead08fba60a585494ea1575147
Parents: f9307d8
Author: Holden Karau <ho...@us.ibm.com>
Authored: Tue Feb 9 08:44:56 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 9 08:44:56 2016 +0000

----------------------------------------------------------------------
 .../spark/streaming/TestOutputStream.scala      |  5 +-
 .../flume/FlumePollingStreamSuite.scala         | 13 ++-
 .../streaming/flume/FlumeStreamSuite.scala      | 16 ++--
 .../kafka/DirectKafkaStreamSuite.scala          | 43 +++++-----
 .../receiver/ReceiverSupervisorImpl.scala       | 17 ++--
 .../apache/spark/streaming/ui/BatchPage.scala   |  2 +-
 .../apache/spark/streaming/ui/BatchUIData.scala |  2 +-
 .../ui/StreamingJobProgressListener.scala       | 21 +++--
 .../apache/spark/streaming/JavaTestUtils.scala  |  4 +-
 .../spark/streaming/BasicOperationsSuite.scala  | 24 +++---
 .../spark/streaming/CheckpointSuite.scala       | 22 ++---
 .../spark/streaming/InputStreamsSuite.scala     | 72 ++++++++--------
 .../spark/streaming/MapWithStateSuite.scala     | 10 +--
 .../spark/streaming/MasterFailureTest.scala     |  9 +-
 .../streaming/StreamingListenerSuite.scala      | 89 ++++++++++----------
 .../apache/spark/streaming/TestSuiteBase.scala  | 36 ++++----
 .../receiver/BlockGeneratorSuite.scala          | 32 ++++---
 .../ui/StreamingJobProgressListenerSuite.scala  |  2 +-
 .../streaming/util/RecurringTimerSuite.scala    | 22 ++---
 19 files changed, 226 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 57374ef..fc02c9f 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
@@ -33,10 +34,10 @@ import org.apache.spark.util.Utils
  * The buffer contains a sequence of RDD's, each containing a sequence of items
  */
 class TestOutputStream[T: ClassTag](parent: DStream[T],
-    val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+    val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
-    output += collected
+    output.add(collected)
   }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 60db846..10dcbf9 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
       FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
         utils.eventsPerBatch, 5)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputQueue)
     outputStream.register()
 
     ssc.start()
@@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
 
       // The eventually is required to ensure that all data in the batch has been processed.
       eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val flattenOutputBuffer = outputBuffer.flatten
-        val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
+        val flattenOutput = outputQueue.asScala.toSeq.flatten
+        val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
           case (key, value) => (key.toString, value.toString)
         }).map(_.asJava)
-        val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
+        val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
         utils.assertOutput(headers.asJava, bodies.asJava)
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index b29e591..38208c6 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.streaming.flume
 
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
     val input = (1 to 100).map { _.toString }
     val utils = new FlumeTestUtils
     try {
-      val outputBuffer = startContext(utils.getTestPort(), testCompression)
+      val outputQueue = startContext(utils.getTestPort(), testCompression)
 
       eventually(timeout(10 seconds), interval(100 milliseconds)) {
         utils.writeInput(input.asJava, testCompression)
       }
 
       eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val outputEvents = outputBuffer.flatten.map { _.event }
+        val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
         outputEvents.foreach {
           event =>
             event.getHeaders.get("test") should be("header")
@@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
 
   /** Setup and start the streaming context */
   private def startContext(
-      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+      testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
     ssc = new StreamingContext(conf, Milliseconds(200))
     val flumeStream = FlumeUtils.createStream(
       ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputQueue)
     outputStream.register()
     ssc.start()
-    outputBuffer
+    outputQueue
   }
 
   /** Class to create socket channel with compression */

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 655b161..8398178 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.streaming.kafka
 
 import java.io.File
+import java.util.Arrays
 import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -101,8 +102,7 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, topics)
     }
 
-    val allReceived =
-      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
 
     // hold a reference to the current offset ranges, so it can be used downstream
     var offsetRanges = Array[OffsetRange]()
@@ -131,11 +131,12 @@ class DirectKafkaStreamSuite
         assert(partSize === rangeSize, "offset ranges are wrong")
       }
     }
-    stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
     ssc.start()
     eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
       assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
     }
     ssc.stop()
   }
@@ -173,8 +174,8 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
-    stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
     ssc.start()
     val newData = Map("b" -> 10)
     kafkaTestUtils.sendMessages(topic, newData)
@@ -219,8 +220,8 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
-    stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
     ssc.start()
     val newData = Map("b" -> 10)
     kafkaTestUtils.sendMessages(topic, newData)
@@ -265,7 +266,7 @@ class DirectKafkaStreamSuite
     // This is to collect the raw data received from Kafka
     kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
       val data = rdd.map { _._2 }.collect()
-      DirectKafkaStreamSuite.collectedData.appendAll(data)
+      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
     }
 
     // This is ensure all the data is eventually receiving only once
@@ -335,14 +336,14 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, Set(topic))
     }
 
-    val allReceived =
-      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]
 
-    stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
     ssc.start()
     eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
       assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
 
       // Calculate all the record number collected in the StreamingListener.
       assert(collector.numRecordsSubmitted.get() === totalSent)
@@ -389,17 +390,16 @@ class DirectKafkaStreamSuite
       }
     }
 
-    val collectedData =
-      new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
+    val collectedData = new ConcurrentLinkedQueue[Array[String]]()
 
     // Used for assertion failure messages.
     def dataToString: String =
-      collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+      collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
 
     // This is to collect the raw data received from Kafka
     kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
       val data = rdd.map { _._2 }.collect()
-      collectedData += data
+      collectedData.add(data)
     }
 
     ssc.start()
@@ -415,7 +415,7 @@ class DirectKafkaStreamSuite
       eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
         // Assert that rate estimator values are used to determine maxMessagesPerPartition.
         // Funky "-" in message makes the complete assertion message read better.
-        assert(collectedData.exists(_.size == expectedSize),
+        assert(collectedData.asScala.exists(_.size == expectedSize),
           s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
       }
     }
@@ -433,7 +433,7 @@ class DirectKafkaStreamSuite
 }
 
 object DirectKafkaStreamSuite {
-  val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+  val collectedData = new ConcurrentLinkedQueue[String]()
   @volatile var total = -1L
 
   class InputInfoCollector extends StreamingListener {
@@ -468,4 +468,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
       processingDelay: Long,
       schedulingDelay: Long): Option[Double] = Some(rate)
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index b774b6b..8d4e682 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -19,8 +19,9 @@ package org.apache.spark.streaming.receiver
 
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.base.Throwables
@@ -83,7 +84,7 @@ private[streaming] class ReceiverSupervisorImpl(
           cleanupOldBlocks(threshTime)
         case UpdateRateLimit(eps) =>
           logInfo(s"Received a new rate limit: $eps.")
-          registeredBlockGenerators.foreach { bg =>
+          registeredBlockGenerators.asScala.foreach { bg =>
             bg.updateRate(eps)
           }
       }
@@ -92,8 +93,7 @@ private[streaming] class ReceiverSupervisorImpl(
   /** Unique block ids if one wants to add blocks directly */
   private val newBlockId = new AtomicLong(System.currentTimeMillis())
 
-  private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
-    with mutable.SynchronizedBuffer[BlockGenerator]
+  private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
 
   /** Divides received data records into data blocks for pushing in BlockManager. */
   private val defaultBlockGeneratorListener = new BlockGeneratorListener {
@@ -170,11 +170,11 @@ private[streaming] class ReceiverSupervisorImpl(
   }
 
   override protected def onStart() {
-    registeredBlockGenerators.foreach { _.start() }
+    registeredBlockGenerators.asScala.foreach { _.start() }
   }
 
   override protected def onStop(message: String, error: Option[Throwable]) {
-    registeredBlockGenerators.foreach { _.stop() }
+    registeredBlockGenerators.asScala.foreach { _.stop() }
     env.rpcEnv.stop(endpoint)
   }
 
@@ -194,10 +194,11 @@ private[streaming] class ReceiverSupervisorImpl(
   override def createBlockGenerator(
       blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
     // Cleanup BlockGenerators that have already been stopped
-    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
+    val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
+    stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
 
     val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
-    registeredBlockGenerators += newBlockGenerator
+    registeredBlockGenerators.add(newBlockGenerator)
     newBlockGenerator
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 81de07f..e235afa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -273,7 +273,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
       map { case (outputOpId, outputOpIdAndSparkJobIds) =>
         // sort SparkJobIds for each OutputOpId
-        (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+        (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).toSeq.sorted)
       }
 
     val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 3ef3689..1af6085 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -33,7 +33,7 @@ private[ui] case class BatchUIData(
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
     val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
-    var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+    var outputOpIdSparkJobIdPairs: Iterable[OutputOpIdAndSparkJobId] = Seq.empty) {
 
   /**
    * Time taken for the first job of this batch to start processing from the time this batch

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index cacd430..30a3a98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.streaming.ui
 
 import java.util.{LinkedHashMap, Map => JMap, Properties}
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, Queue, SynchronizedBuffer}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Queue}
 
 import org.apache.spark.scheduler._
 import org.apache.spark.streaming.{StreamingContext, Time}
@@ -41,9 +43,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
   // cannot use a map of (Time, BatchUIData).
   private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
-    new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+    new LinkedHashMap[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]] {
       override def removeEldestEntry(
-          p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+          p1: JMap.Entry[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]]): Boolean = {
         // If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
         // SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
         // may add some information for a removed batch when processing "onJobStart". It will be a
@@ -131,12 +133,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
       var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
       if (outputOpIdToSparkJobIds == null) {
-        outputOpIdToSparkJobIds =
-          new ArrayBuffer[OutputOpIdAndSparkJobId]()
-            with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+        outputOpIdToSparkJobIds = new ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]()
         batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
       }
-      outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
+      outputOpIdToSparkJobIds.add(OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId))
     }
   }
 
@@ -256,8 +256,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
       }
     }
     batchUIData.foreach { _batchUIData =>
-      val outputOpIdToSparkJobIds =
-        Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+      // We use an Iterable rather than explicitly converting to a seq so that updates
+      // will propegate
+      val outputOpIdToSparkJobIds: Iterable[OutputOpIdAndSparkJobId] =
+        Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime).asScala)
+          .getOrElse(Seq.empty)
       _batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
     }
     batchUIData

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 57b50bd..ae44fd0 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase {
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
     ssc.getState()
     val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
-    res.map(_.asJava).asJava
+    res.map(_.asJava).toSeq.asJava
   }
 
   /**
@@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase {
     implicit val cm: ClassTag[V] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
     val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
-    res.map(entry => entry.map(_.asJava).asJava).asJava
+    res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 25e7ae8..f1c6479 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.language.existentials
 import scala.reflect.ClassTag
 
@@ -84,9 +86,10 @@ class BasicOperationsSuite extends TestSuiteBase {
     withStreamingContext(setupStreams(input, operation, 2)) { ssc =>
       val output = runStreamsWithPartitions(ssc, 3, 3)
       assert(output.size === 3)
-      val first = output(0)
-      val second = output(1)
-      val third = output(2)
+      val outputArray = output.toArray
+      val first = outputArray(0)
+      val second = outputArray(1)
+      val third = outputArray(2)
 
       assert(first.size === 5)
       assert(second.size === 5)
@@ -104,9 +107,10 @@ class BasicOperationsSuite extends TestSuiteBase {
     withStreamingContext(setupStreams(input, operation, 5)) { ssc =>
       val output = runStreamsWithPartitions(ssc, 3, 3)
       assert(output.size === 3)
-      val first = output(0)
-      val second = output(1)
-      val third = output(2)
+      val outputArray = output.toArray
+      val first = outputArray(0)
+      val second = outputArray(1)
+      val third = outputArray(2)
 
       assert(first.size === 2)
       assert(second.size === 2)
@@ -645,8 +649,8 @@ class BasicOperationsSuite extends TestSuiteBase {
         val networkStream =
           ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
         val mappedStream = networkStream.map(_ + ".").persist()
-        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-        val outputStream = new TestOutputStream(mappedStream, outputBuffer)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(mappedStream, outputQueue)
 
         outputStream.register()
         ssc.start()
@@ -685,7 +689,7 @@ class BasicOperationsSuite extends TestSuiteBase {
         testServer.stop()
 
         // verify data has been received
-        assert(outputBuffer.size > 0)
+        assert(!outputQueue.isEmpty)
         assert(blockRdds.size > 0)
         assert(persistentRddIds.size > 0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 786703e..1f0245a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.streaming
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import com.google.common.base.Charsets
@@ -105,7 +106,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
     val operatedStream = operation(inputStream)
     operatedStream.print()
     val outputStream = new TestOutputStreamWithPartitions(operatedStream,
-      new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
+      new ConcurrentLinkedQueue[Seq[Seq[V]]])
     outputStream.register()
     ssc.checkpoint(checkpointDir)
 
@@ -166,7 +167,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
         // are written to make sure that both of them have been written.
         assert(checkpointFilesOfLatestTime.size === 2)
       }
-      outputStream.output.map(_.flatten)
+      outputStream.output.asScala.map(_.flatten).toSeq
 
     } finally {
       ssc.stop(stopSparkContext = stopSparkContext)
@@ -591,7 +592,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     // Set up the streaming context and input streams
     val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
     val testDir = Utils.createTempDir()
-    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+    val outputBuffer = new ConcurrentLinkedQueue[Seq[Int]]
 
     /**
      * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
@@ -671,7 +672,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
         ssc.stop()
         // Check that we shut down while the third batch was being processed
         assert(batchCounter.getNumCompletedBatches === 2)
-        assert(outputStream.output.flatten === Seq(1, 3))
+        assert(outputStream.output.asScala.toSeq.flatten === Seq(1, 3))
       }
 
       // The original StreamingContext has now been stopped.
@@ -721,7 +722,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
             assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
           }
         }
-        logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+        logInfo("Output after restart = " + outputStream.output.asScala.mkString("[", ", ", "]"))
         assert(outputStream.output.size > 0, "No files processed after restart")
         ssc.stop()
 
@@ -730,11 +731,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
         assert(recordedFiles(ssc) === (1 to 9))
 
         // Append the new output to the old buffer
-        outputBuffer ++= outputStream.output
+        outputBuffer.addAll(outputStream.output)
 
         // Verify whether all the elements received are as expected
         val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
-        assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+        assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
       }
     } finally {
       Utils.deleteRecursively(testDir)
@@ -894,7 +895,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
    * Advances the manual clock on the streaming scheduler by given number of batches.
    * It also waits for the expected amount of time for each batch.
    */
-  def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
+  def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long):
+      Iterable[Seq[V]] =
   {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     logInfo("Manual clock before advancing = " + clock.getTimeMillis())
@@ -908,7 +910,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
       dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
     }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
-    outputStream.output.map(_.flatten)
+    outputStream.output.asScala.map(_.flatten)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 75591f0..93c8833 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.streaming
 import java.io.{BufferedWriter, File, OutputStreamWriter}
 import java.net.{ServerSocket, Socket, SocketException}
 import java.nio.charset.Charset
-import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer, SynchronizedQueue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.SynchronizedQueue
 import scala.language.postfixOps
 
 import com.google.common.io.Files
@@ -58,8 +59,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         val batchCounter = new BatchCounter(ssc)
         val networkStream = ssc.socketTextStream(
           "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
-        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-        val outputStream = new TestOutputStream(networkStream, outputBuffer)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(networkStream, outputQueue)
         outputStream.register()
         ssc.start()
 
@@ -90,9 +91,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
         // Verify whether data received was as expected
         logInfo("--------------------------------")
-        logInfo("output.size = " + outputBuffer.size)
+        logInfo("output.size = " + outputQueue.size)
         logInfo("output")
-        outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+        outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
         logInfo("expected output.size = " + expectedOutput.size)
         logInfo("expected output")
         expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -100,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
         // Verify whether all the elements received are as expected
         // (whether the elements were received one in each interval is not verified)
-        val output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
+        val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
         assert(output.size === expectedOutput.size)
         for (i <- 0 until output.size) {
           assert(output(i) === expectedOutput(i))
@@ -119,8 +120,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         val batchCounter = new BatchCounter(ssc)
         val networkStream = ssc.socketTextStream(
           "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
-        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-        val outputStream = new TestOutputStream(networkStream, outputBuffer)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(networkStream, outputQueue)
         outputStream.register()
         ssc.start()
 
@@ -156,9 +157,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
         val batchCounter = new BatchCounter(ssc)
         val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
-        val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
-          with SynchronizedBuffer[Seq[Array[Byte]]]
-        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[Array[Byte]]]
+        val outputStream = new TestOutputStream(fileStream, outputQueue)
         outputStream.register()
         ssc.start()
 
@@ -183,8 +183,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
 
         val expectedOutput = input.map(i => i.toByte)
-        val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
-        assert(obtainedOutput === expectedOutput)
+        val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
+        assert(obtainedOutput.toSeq === expectedOutput)
       }
     } finally {
       if (testDir != null) Utils.deleteRecursively(testDir)
@@ -206,15 +206,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val numTotalRecords = numThreads * numRecordsPerThread
     val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
     MultiThreadTestReceiver.haveAllThreadsFinished = false
-    val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
-    def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
+    def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
 
     // set up the network stream using the test receiver
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
       val networkStream = ssc.receiverStream[Int](testReceiver)
       val countStream = networkStream.count
 
-      val outputStream = new TestOutputStream(countStream, outputBuffer)
+      val outputStream = new TestOutputStream(countStream, outputQueue)
       outputStream.register()
       ssc.start()
 
@@ -231,9 +231,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")
-    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output.size = " + outputQueue.size)
     logInfo("output")
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
     logInfo("--------------------------------")
     assert(output.sum === numTotalRecords)
   }
@@ -241,14 +241,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   test("queue input stream - oneAtATime = true") {
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = input.map(Seq(_))
-    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
 
     // Set up the streaming context and input streams
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
       val queue = new SynchronizedQueue[RDD[String]]()
       val queueStream = ssc.queueStream(queue, oneAtATime = true)
-      val outputStream = new TestOutputStream(queueStream, outputBuffer)
+      val outputStream = new TestOutputStream(queueStream, outputQueue)
       outputStream.register()
       ssc.start()
 
@@ -266,9 +266,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")
-    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output.size = " + outputQueue.size)
     logInfo("output")
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
     logInfo("expected output.size = " + expectedOutput.size)
     logInfo("expected output")
     expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -276,14 +276,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Verify whether all the elements received are as expected
     assert(output.size === expectedOutput.size)
-    for (i <- 0 until output.size) {
-      assert(output(i) === expectedOutput(i))
-    }
+    output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))}
   }
 
   test("queue input stream - oneAtATime = false") {
-    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+    def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
 
@@ -291,7 +289,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
       val queue = new SynchronizedQueue[RDD[String]]()
       val queueStream = ssc.queueStream(queue, oneAtATime = false)
-      val outputStream = new TestOutputStream(queueStream, outputBuffer)
+      val outputStream = new TestOutputStream(queueStream, outputQueue)
       outputStream.register()
       ssc.start()
 
@@ -312,9 +310,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")
-    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output.size = " + outputQueue.size)
     logInfo("output")
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
     logInfo("expected output.size = " + expectedOutput.size)
     logInfo("expected output")
     expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -322,9 +320,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Verify whether all the elements received are as expected
     assert(output.size === expectedOutput.size)
-    for (i <- 0 until output.size) {
-      assert(output(i) === expectedOutput(i))
-    }
+    output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))}
   }
 
   test("test track the number of input stream") {
@@ -373,8 +369,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         val batchCounter = new BatchCounter(ssc)
         val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
           testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
-        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputQueue)
         outputStream.register()
         ssc.start()
 
@@ -404,7 +400,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         } else {
           (Seq(0) ++ input).map(_.toString).toSet
         }
-        assert(outputBuffer.flatten.toSet === expectedOutput)
+        assert(outputQueue.asScala.flatten.toSet === expectedOutput)
       }
     } finally {
       if (testDir != null) Utils.deleteRecursively(testDir)

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 2984fd2..b6d6585 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.streaming
 
 import java.io.File
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@@ -550,9 +551,9 @@ class MapWithStateSuite extends SparkFunSuite
     val ssc = new StreamingContext(sc, Seconds(1))
     val inputStream = new TestInputStream(ssc, input, numPartitions = 2)
     val trackeStateStream = inputStream.map(x => (x, 1)).mapWithState(mapWithStateSpec)
-    val collectedOutputs = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+    val collectedOutputs = new ConcurrentLinkedQueue[Seq[T]]
     val outputStream = new TestOutputStream(trackeStateStream, collectedOutputs)
-    val collectedStateSnapshots = new ArrayBuffer[Seq[(K, S)]] with SynchronizedBuffer[Seq[(K, S)]]
+    val collectedStateSnapshots = new ConcurrentLinkedQueue[Seq[(K, S)]]
     val stateSnapshotStream = new TestOutputStream(
       trackeStateStream.stateSnapshots(), collectedStateSnapshots)
     outputStream.register()
@@ -567,7 +568,7 @@ class MapWithStateSuite extends SparkFunSuite
 
     batchCounter.waitUntilBatchesCompleted(numBatches, 10000)
     ssc.stop(stopSparkContext = false)
-    (collectedOutputs, collectedStateSnapshots)
+    (collectedOutputs.asScala.toSeq, collectedStateSnapshots.asScala.toSeq)
   }
 
   private def assert[U](expected: Seq[Seq[U]], collected: Seq[Seq[U]], typ: String) {
@@ -583,4 +584,3 @@ class MapWithStateSuite extends SparkFunSuite
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 7bbbdeb..a02d49e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
 import java.nio.charset.Charset
 import java.util.UUID
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 import scala.util.Random
@@ -215,8 +216,8 @@ object MasterFailureTest extends Logging {
 
     while(!isLastOutputGenerated && !isTimedOut) {
       // Get the output buffer
-      val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
-      def output = outputBuffer.flatMap(x => x)
+      val outputQueue = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
+      def output = outputQueue.asScala.flatten
 
       // Start the thread to kill the streaming after some time
       killed = false
@@ -257,9 +258,9 @@ object MasterFailureTest extends Logging {
 
       // Verify whether the output of each batch has only one element or no element
       // and then merge the new output with all the earlier output
-      mergedOutput ++= output
+      mergedOutput ++= output.toSeq
       totalTimeRan += timeRan
-      logInfo("New output = " + output)
+      logInfo("New output = " + output.toSeq)
       logInfo("Merged output = " + mergedOutput)
       logInfo("Time ran = " + timeRan)
       logInfo("Total time ran = " + totalTimeRan)

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 1ed68c7..66f4739 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.streaming
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, SynchronizedMap}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
@@ -62,43 +65,43 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     val batchInfosSubmitted = collector.batchInfosSubmitted
     batchInfosSubmitted should have size 4
 
-    batchInfosSubmitted.foreach(info => {
+    batchInfosSubmitted.asScala.foreach(info => {
       info.schedulingDelay should be (None)
       info.processingDelay should be (None)
       info.totalDelay should be (None)
     })
 
-    batchInfosSubmitted.foreach { info =>
+    batchInfosSubmitted.asScala.foreach { info =>
       info.numRecords should be (1L)
       info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
-    isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosSubmitted.asScala.map(_.submissionTime)) should be (true)
 
     // SPARK-6766: processingStartTime of batch info should not be None when starting
     val batchInfosStarted = collector.batchInfosStarted
     batchInfosStarted should have size 4
 
-    batchInfosStarted.foreach(info => {
+    batchInfosStarted.asScala.foreach(info => {
       info.schedulingDelay should not be None
       info.schedulingDelay.get should be >= 0L
       info.processingDelay should be (None)
       info.totalDelay should be (None)
     })
 
-    batchInfosStarted.foreach { info =>
+    batchInfosStarted.asScala.foreach { info =>
       info.numRecords should be (1L)
       info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
-    isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
-    isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosStarted.asScala.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosStarted.asScala.map(_.processingStartTime.get)) should be (true)
 
     // test onBatchCompleted
     val batchInfosCompleted = collector.batchInfosCompleted
     batchInfosCompleted should have size 4
 
-    batchInfosCompleted.foreach(info => {
+    batchInfosCompleted.asScala.foreach(info => {
       info.schedulingDelay should not be None
       info.processingDelay should not be None
       info.totalDelay should not be None
@@ -107,14 +110,14 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
       info.totalDelay.get should be >= 0L
     })
 
-    batchInfosCompleted.foreach { info =>
+    batchInfosCompleted.asScala.foreach { info =>
       info.numRecords should be (1L)
       info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
     }
 
-    isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
-    isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
-    isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.asScala.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.asScala.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfosCompleted.asScala.map(_.processingEndTime.get)) should be (true)
   }
 
   test("receiver info reporting") {
@@ -129,13 +132,13 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     try {
       eventually(timeout(30 seconds), interval(20 millis)) {
         collector.startedReceiverStreamIds.size should equal (1)
-        collector.startedReceiverStreamIds(0) should equal (0)
-        collector.stoppedReceiverStreamIds should have size 1
-        collector.stoppedReceiverStreamIds(0) should equal (0)
+        collector.startedReceiverStreamIds.peek() should equal (0)
+        collector.stoppedReceiverStreamIds.size should equal (1)
+        collector.stoppedReceiverStreamIds.peek() should equal (0)
         collector.receiverErrors should have size 1
-        collector.receiverErrors(0)._1 should equal (0)
-        collector.receiverErrors(0)._2 should include ("report error")
-        collector.receiverErrors(0)._3 should include ("report exception")
+        collector.receiverErrors.peek()._1 should equal (0)
+        collector.receiverErrors.peek()._2 should include ("report error")
+        collector.receiverErrors.peek()._3 should include ("report exception")
       }
     } finally {
       ssc.stop()
@@ -155,8 +158,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     ssc.start()
     try {
       eventually(timeout(30 seconds), interval(20 millis)) {
-        collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
-        collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+        collector.startedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
+        collector.completedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
       }
     } finally {
       ssc.stop()
@@ -271,69 +274,63 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
   }
 
   /** Check if a sequence of numbers is in increasing order */
-  def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
-    for (i <- 1 until seq.size) {
-      if (seq(i - 1) > seq(i)) {
-        return false
-      }
-    }
-    true
+  def isInIncreasingOrder(data: Iterable[Long]): Boolean = {
+    !data.sliding(2).map{itr => itr.size == 2 && itr.head > itr.tail.head }.contains(true)
   }
 }
 
 /** Listener that collects information on processed batches */
 class BatchInfoCollector extends StreamingListener {
-  val batchInfosCompleted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
-  val batchInfosStarted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
-  val batchInfosSubmitted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
+  val batchInfosCompleted = new ConcurrentLinkedQueue[BatchInfo]
+  val batchInfosStarted = new ConcurrentLinkedQueue[BatchInfo]
+  val batchInfosSubmitted = new ConcurrentLinkedQueue[BatchInfo]
 
   override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
-    batchInfosSubmitted += batchSubmitted.batchInfo
+    batchInfosSubmitted.add(batchSubmitted.batchInfo)
   }
 
   override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
-    batchInfosStarted += batchStarted.batchInfo
+    batchInfosStarted.add(batchStarted.batchInfo)
   }
 
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
-    batchInfosCompleted += batchCompleted.batchInfo
+    batchInfosCompleted.add(batchCompleted.batchInfo)
   }
 }
 
 /** Listener that collects information on processed batches */
 class ReceiverInfoCollector extends StreamingListener {
-  val startedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
-  val stoppedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
-  val receiverErrors =
-    new ArrayBuffer[(Int, String, String)] with SynchronizedBuffer[(Int, String, String)]
+  val startedReceiverStreamIds = new ConcurrentLinkedQueue[Int]
+  val stoppedReceiverStreamIds = new ConcurrentLinkedQueue[Int]
+  val receiverErrors = new ConcurrentLinkedQueue[(Int, String, String)]
 
   override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
-    startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
+    startedReceiverStreamIds.add(receiverStarted.receiverInfo.streamId)
   }
 
   override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
-    stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
+    stoppedReceiverStreamIds.add(receiverStopped.receiverInfo.streamId)
   }
 
   override def onReceiverError(receiverError: StreamingListenerReceiverError) {
-    receiverErrors += ((receiverError.receiverInfo.streamId,
-      receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError))
+    receiverErrors.add(((receiverError.receiverInfo.streamId,
+      receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)))
   }
 }
 
 /** Listener that collects information on processed output operations */
 class OutputOperationInfoCollector extends StreamingListener {
-  val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
-  val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+  val startedOutputOperationIds = new ConcurrentLinkedQueue[Int]()
+  val completedOutputOperationIds = new ConcurrentLinkedQueue[Int]()
 
   override def onOutputOperationStarted(
       outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
-    startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id
+    startedOutputOperationIds.add(outputOperationStarted.outputOperationInfo.id)
   }
 
   override def onOutputOperationCompleted(
       outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
-    completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id
+    completedOutputOperationIds.add(outputOperationCompleted.outputOperationInfo.id)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 239b108..82cd63b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.streaming
 
 import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.SynchronizedBuffer
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
@@ -87,17 +87,17 @@ class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], n
 
 /**
  * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ * ConcurrentLinkedQueue. This queue is wiped clean on being restored from checkpoint.
  *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
+ * The buffer contains a sequence of RDD's, each containing a sequence of items.
  */
 class TestOutputStream[T: ClassTag](
     parent: DStream[T],
-    val output: SynchronizedBuffer[Seq[T]] =
-      new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+    val output: ConcurrentLinkedQueue[Seq[T]] =
+      new ConcurrentLinkedQueue[Seq[T]]()
   ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
-    output += collected
+    output.add(collected)
   }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
@@ -110,18 +110,18 @@ class TestOutputStream[T: ClassTag](
 
 /**
  * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ * ConcurrentLinkedQueue. This queue is wiped clean on being restored from checkpoint.
  *
- * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+ * The queue contains a sequence of RDD's, each containing a sequence of partitions, each
  * containing a sequence of items.
  */
 class TestOutputStreamWithPartitions[T: ClassTag](
     parent: DStream[T],
-    val output: SynchronizedBuffer[Seq[Seq[T]]] =
-      new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
+    val output: ConcurrentLinkedQueue[Seq[Seq[T]]] =
+      new ConcurrentLinkedQueue[Seq[Seq[T]]]())
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
-    output += collected
+    output.add(collected)
   }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
@@ -322,7 +322,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
     val inputStream = new TestInputStream(ssc, input, numPartitions)
     val operatedStream = operation(inputStream)
     val outputStream = new TestOutputStreamWithPartitions(operatedStream,
-      new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
+      new ConcurrentLinkedQueue[Seq[Seq[V]]])
     outputStream.register()
     ssc
   }
@@ -347,7 +347,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
     val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
     val operatedStream = operation(inputStream1, inputStream2)
     val outputStream = new TestOutputStreamWithPartitions(operatedStream,
-      new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
+      new ConcurrentLinkedQueue[Seq[Seq[W]]])
     outputStream.register()
     ssc
   }
@@ -418,7 +418,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
       }
       val timeTaken = System.currentTimeMillis() - startTime
       logInfo("Output generated in " + timeTaken + " milliseconds")
-      output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+      output.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
       assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
       assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
 
@@ -426,7 +426,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
     } finally {
       ssc.stop(stopSparkContext = true)
     }
-    output
+    output.asScala.toSeq
   }
 
   /**
@@ -501,7 +501,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
     val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
     withStreamingContext(setupStreams[U, V](input, operation)) { ssc =>
       val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
-      verifyOutput[V](output, expectedOutput, useSet)
+      verifyOutput[V](output.toSeq, expectedOutput, useSet)
     }
   }
 
@@ -540,7 +540,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
     val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
     withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc =>
       val output = runStreams[W](ssc, numBatches_, expectedOutput.size)
-      verifyOutput[W](output, expectedOutput, useSet)
+      verifyOutput[W](output.toSeq, expectedOutput, useSet)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index f5ec0ff..a1d0561 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.streaming.receiver
 
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.reflectiveCalls
 
@@ -84,7 +87,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
         assert(listener.onPushBlockCalled === true)
       }
     }
-    listener.pushedData should contain theSameElementsInOrderAs (data1)
+    listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1)
     assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback()
 
     // Verify addDataWithCallback() add data+metadata and and callbacks are called correctly
@@ -92,21 +95,24 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
     val metadata2 = data2.map { _.toString }
     data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) }
     assert(listener.onAddDataCalled === true)
-    listener.addedData should contain theSameElementsInOrderAs (data2)
-    listener.addedMetadata should contain theSameElementsInOrderAs (metadata2)
+    listener.addedData.asScala.toSeq should contain theSameElementsInOrderAs (data2)
+    listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (metadata2)
     clock.advance(blockIntervalMs)  // advance clock to generate blocks
     eventually(timeout(1 second)) {
-      listener.pushedData should contain theSameElementsInOrderAs (data1 ++ data2)
+      val combined = data1 ++ data2
+      listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
     }
 
     // Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly
     val data3 = 21 to 30
     val metadata3 = "metadata"
     blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3)
-    listener.addedMetadata should contain theSameElementsInOrderAs (metadata2 :+ metadata3)
+    val combinedMetadata = metadata2 :+ metadata3
+    listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (combinedMetadata)
     clock.advance(blockIntervalMs)  // advance clock to generate blocks
     eventually(timeout(1 second)) {
-      listener.pushedData should contain theSameElementsInOrderAs (data1 ++ data2 ++ data3)
+      val combinedData = data1 ++ data2 ++ data3
+      listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (combinedData)
     }
 
     // Stop the block generator by starting the stop on a different thread and
@@ -191,7 +197,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
       assert(thread.isAlive === false)
     }
     assert(blockGenerator.isStopped() === true) // generator has finally been completely stopped
-    assert(listener.pushedData === data, "All data not pushed by stop()")
+    assert(listener.pushedData.asScala.toSeq === data, "All data not pushed by stop()")
   }
 
   test("block push errors are reported") {
@@ -231,15 +237,15 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
 
   /** A listener for BlockGenerator that records the data in the callbacks */
   private class TestBlockGeneratorListener extends BlockGeneratorListener {
-    val pushedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
-    val addedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
-    val addedMetadata = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
+    val pushedData = new ConcurrentLinkedQueue[Any]
+    val addedData = new ConcurrentLinkedQueue[Any]
+    val addedMetadata = new ConcurrentLinkedQueue[Any]
     @volatile var onGenerateBlockCalled = false
     @volatile var onAddDataCalled = false
     @volatile var onPushBlockCalled = false
 
     override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-      pushedData ++= arrayBuffer
+      pushedData.addAll(arrayBuffer.asJava)
       onPushBlockCalled = true
     }
     override def onError(message: String, throwable: Throwable): Unit = {}
@@ -247,8 +253,8 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
       onGenerateBlockCalled = true
     }
     override def onAddData(data: Any, metadata: Any): Unit = {
-      addedData += data
-      addedMetadata += metadata
+      addedData.add(data)
+      addedMetadata.add(metadata)
       onAddDataCalled = true
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 34cd743..26b757c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -200,7 +200,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
     batchUIData.get.streamIdToInputInfo should be (Map.empty)
     batchUIData.get.numRecords should be (0)
-    batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
+    batchUIData.get.outputOpIdSparkJobIdPairs.toSeq should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
 
     // A lot of "onBatchCompleted"s happen before "onJobStart"
     for(i <- limit + 1 to limit * 2) {

http://git-wip-us.apache.org/repos/asf/spark/blob/159198ef/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
index 0544972..25b70a3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.streaming.util
 
-import scala.collection.mutable
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import org.scalatest.PrivateMethodTester
@@ -30,34 +32,34 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
 
   test("basic") {
     val clock = new ManualClock()
-    val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
+    val results = new ConcurrentLinkedQueue[Long]()
     val timer = new RecurringTimer(clock, 100, time => {
-      results += time
+      results.add(time)
     }, "RecurringTimerSuite-basic")
     timer.start(0)
     eventually(timeout(10.seconds), interval(10.millis)) {
-      assert(results === Seq(0L))
+      assert(results.asScala.toSeq === Seq(0L))
     }
     clock.advance(100)
     eventually(timeout(10.seconds), interval(10.millis)) {
-      assert(results === Seq(0L, 100L))
+      assert(results.asScala.toSeq === Seq(0L, 100L))
     }
     clock.advance(200)
     eventually(timeout(10.seconds), interval(10.millis)) {
-      assert(results === Seq(0L, 100L, 200L, 300L))
+      assert(results.asScala.toSeq === Seq(0L, 100L, 200L, 300L))
     }
     assert(timer.stop(interruptTimer = true) === 300L)
   }
 
   test("SPARK-10224: call 'callback' after stopping") {
     val clock = new ManualClock()
-    val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
+    val results = new ConcurrentLinkedQueue[Long]
     val timer = new RecurringTimer(clock, 100, time => {
-      results += time
+      results.add(time)
     }, "RecurringTimerSuite-SPARK-10224")
     timer.start(0)
     eventually(timeout(10.seconds), interval(10.millis)) {
-      assert(results === Seq(0L))
+      assert(results.asScala.toSeq === Seq(0L))
     }
     @volatile var lastTime = -1L
     // Now RecurringTimer is waiting for the next interval
@@ -77,7 +79,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
     // Then it will find `stopped` is true and exit the loop, but it should call `callback` again
     // before exiting its internal thread.
     thread.join()
-    assert(results === Seq(0L, 100L, 200L))
+    assert(results.asScala.toSeq === Seq(0L, 100L, 200L))
     assert(lastTime === 200L)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org