You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/12/03 21:00:13 UTC

spark git commit: [FLAKY-TEST-FIX][STREAMING][TEST] Make sure StreamingContexts are shutdown after test

Repository: spark
Updated Branches:
  refs/heads/master ad7cea6f7 -> a02d47277


[FLAKY-TEST-FIX][STREAMING][TEST] Make sure StreamingContexts are shutdown after test

Author: Tathagata Das <ta...@gmail.com>

Closes #10124 from tdas/InputStreamSuite-flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a02d4727
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a02d4727
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a02d4727

Branch: refs/heads/master
Commit: a02d47277379e1e82d0ee41b2205434f9ffbc3e5
Parents: ad7cea6
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 3 12:00:09 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 3 12:00:09 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/InputStreamsSuite.scala     | 122 +++++++++----------
 1 file changed, 61 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a02d4727/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 047e38e..3a3176b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -206,28 +206,28 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val numTotalRecords = numThreads * numRecordsPerThread
     val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
     MultiThreadTestReceiver.haveAllThreadsFinished = false
-
-    // set up the network stream using the test receiver
-    val ssc = new StreamingContext(conf, batchDuration)
-    val networkStream = ssc.receiverStream[Int](testReceiver)
-    val countStream = networkStream.count
     val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
-    val outputStream = new TestOutputStream(countStream, outputBuffer)
     def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
-    outputStream.register()
-    ssc.start()
-
-    // Let the data from the receiver be received
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    val startTime = System.currentTimeMillis()
-    while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
-      System.currentTimeMillis() - startTime < 5000) {
-      Thread.sleep(100)
-      clock.advance(batchDuration.milliseconds)
+
+    // set up the network stream using the test receiver
+    withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+      val networkStream = ssc.receiverStream[Int](testReceiver)
+      val countStream = networkStream.count
+
+      val outputStream = new TestOutputStream(countStream, outputBuffer)
+      outputStream.register()
+      ssc.start()
+
+      // Let the data from the receiver be received
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+      val startTime = System.currentTimeMillis()
+      while ((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
+        System.currentTimeMillis() - startTime < 5000) {
+        Thread.sleep(100)
+        clock.advance(batchDuration.milliseconds)
+      }
+      Thread.sleep(1000)
     }
-    Thread.sleep(1000)
-    logInfo("Stopping context")
-    ssc.stop()
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")
@@ -239,30 +239,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
   test("queue input stream - oneAtATime = true") {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val queue = new SynchronizedQueue[RDD[String]]()
-    val queueStream = ssc.queueStream(queue, oneAtATime = true)
-    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    val outputStream = new TestOutputStream(queueStream, outputBuffer)
-    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
-    outputStream.register()
-    ssc.start()
-
-    // Setup data queued into the stream
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = input.map(Seq(_))
+    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
 
-    val inputIterator = input.toIterator
-    for (i <- 0 until input.size) {
-      // Enqueue more than 1 item per tick but they should dequeue one at a time
-      inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-      clock.advance(batchDuration.milliseconds)
+    // Set up the streaming context and input streams
+    withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+      val queue = new SynchronizedQueue[RDD[String]]()
+      val queueStream = ssc.queueStream(queue, oneAtATime = true)
+      val outputStream = new TestOutputStream(queueStream, outputBuffer)
+      outputStream.register()
+      ssc.start()
+
+      // Setup data queued into the stream
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+      val inputIterator = input.toIterator
+      for (i <- 0 until input.size) {
+        // Enqueue more than 1 item per tick but they should dequeue one at a time
+        inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+        clock.advance(batchDuration.milliseconds)
+      }
+      Thread.sleep(1000)
     }
-    Thread.sleep(1000)
-    logInfo("Stopping context")
-    ssc.stop()
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")
@@ -282,33 +282,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
   test("queue input stream - oneAtATime = false") {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val queue = new SynchronizedQueue[RDD[String]]()
-    val queueStream = ssc.queueStream(queue, oneAtATime = false)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    val outputStream = new TestOutputStream(queueStream, outputBuffer)
     def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
-    outputStream.register()
-    ssc.start()
-
-    // Setup data queued into the stream
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
 
-    // Enqueue the first 3 items (one by one), they should be merged in the next batch
-    val inputIterator = input.toIterator
-    inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-    clock.advance(batchDuration.milliseconds)
-    Thread.sleep(1000)
-
-    // Enqueue the remaining items (again one by one), merged in the final batch
-    inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-    clock.advance(batchDuration.milliseconds)
-    Thread.sleep(1000)
-    logInfo("Stopping context")
-    ssc.stop()
+    // Set up the streaming context and input streams
+    withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+      val queue = new SynchronizedQueue[RDD[String]]()
+      val queueStream = ssc.queueStream(queue, oneAtATime = false)
+      val outputStream = new TestOutputStream(queueStream, outputBuffer)
+      outputStream.register()
+      ssc.start()
+
+      // Setup data queued into the stream
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+      // Enqueue the first 3 items (one by one), they should be merged in the next batch
+      val inputIterator = input.toIterator
+      inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+      clock.advance(batchDuration.milliseconds)
+      Thread.sleep(1000)
+
+      // Enqueue the remaining items (again one by one), merged in the final batch
+      inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+      clock.advance(batchDuration.milliseconds)
+      Thread.sleep(1000)
+    }
 
     // Verify whether data received was as expected
     logInfo("--------------------------------")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org