You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/12/03 21:00:13 UTC
spark git commit: [FLAKY-TEST-FIX][STREAMING][TEST] Make sure
StreamingContexts are shutdown after test
Repository: spark
Updated Branches:
refs/heads/master ad7cea6f7 -> a02d47277
[FLAKY-TEST-FIX][STREAMING][TEST] Make sure StreamingContexts are shutdown after test
Author: Tathagata Das <ta...@gmail.com>
Closes #10124 from tdas/InputStreamSuite-flaky-test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a02d4727
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a02d4727
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a02d4727
Branch: refs/heads/master
Commit: a02d47277379e1e82d0ee41b2205434f9ffbc3e5
Parents: ad7cea6
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 3 12:00:09 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 3 12:00:09 2015 -0800
----------------------------------------------------------------------
.../spark/streaming/InputStreamsSuite.scala | 122 +++++++++----------
1 file changed, 61 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a02d4727/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 047e38e..3a3176b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -206,28 +206,28 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val numTotalRecords = numThreads * numRecordsPerThread
val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
MultiThreadTestReceiver.haveAllThreadsFinished = false
-
- // set up the network stream using the test receiver
- val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.receiverStream[Int](testReceiver)
- val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
- val outputStream = new TestOutputStream(countStream, outputBuffer)
def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
- outputStream.register()
- ssc.start()
-
- // Let the data from the receiver be received
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val startTime = System.currentTimeMillis()
- while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
- System.currentTimeMillis() - startTime < 5000) {
- Thread.sleep(100)
- clock.advance(batchDuration.milliseconds)
+
+ // set up the network stream using the test receiver
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val networkStream = ssc.receiverStream[Int](testReceiver)
+ val countStream = networkStream.count
+
+ val outputStream = new TestOutputStream(countStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Let the data from the receiver be received
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val startTime = System.currentTimeMillis()
+ while ((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
+ System.currentTimeMillis() - startTime < 5000) {
+ Thread.sleep(100)
+ clock.advance(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
}
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
// Verify whether data received was as expected
logInfo("--------------------------------")
@@ -239,30 +239,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("queue input stream - oneAtATime = true") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val queue = new SynchronizedQueue[RDD[String]]()
- val queueStream = ssc.queueStream(queue, oneAtATime = true)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
- def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- outputStream.register()
- ssc.start()
-
- // Setup data queued into the stream
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- val inputIterator = input.toIterator
- for (i <- 0 until input.size) {
- // Enqueue more than 1 item per tick but they should dequeue one at a time
- inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = true)
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+ val inputIterator = input.toIterator
+ for (i <- 0 until input.size) {
+ // Enqueue more than 1 item per tick but they should dequeue one at a time
+ inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
}
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
// Verify whether data received was as expected
logInfo("--------------------------------")
@@ -282,33 +282,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("queue input stream - oneAtATime = false") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val queue = new SynchronizedQueue[RDD[String]]()
- val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- outputStream.register()
- ssc.start()
-
- // Setup data queued into the stream
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
- // Enqueue the first 3 items (one by one), they should be merged in the next batch
- val inputIterator = input.toIterator
- inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
- Thread.sleep(1000)
-
- // Enqueue the remaining items (again one by one), merged in the final batch
- inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = false)
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+ // Enqueue the first 3 items (one by one), they should be merged in the next batch
+ val inputIterator = input.toIterator
+ inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ Thread.sleep(1000)
+
+ // Enqueue the remaining items (again one by one), merged in the final batch
+ inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ Thread.sleep(1000)
+ }
// Verify whether data received was as expected
logInfo("--------------------------------")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org