You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/22 01:23:51 UTC

spark git commit: [FLAKY-TEST] InputStreamsSuite.socket input stream

Repository: spark
Updated Branches:
  refs/heads/master 7e8994ffd -> afe36516e


[FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz <br...@gmail.com>

Closes #16343 from brkyvz/sock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afe36516
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afe36516
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afe36516

Branch: refs/heads/master
Commit: afe36516e4b4031196ee2e0a04980ac49208ea6b
Parents: 7e8994f
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Dec 21 17:23:48 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 21 17:23:48 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/InputStreamsSuite.scala     | 55 ++++++++------------
 1 file changed, 23 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afe36516/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48..6fb50a4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         val expectedOutput = input.map(_.toString)
         for (i <- input.indices) {
           testServer.send(input(i).toString + "\n")
-          Thread.sleep(500)
           clock.advance(batchDuration.milliseconds)
         }
-        // Make sure we finish all batches before "stop"
-        if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
-          fail("Timeout: cannot finish all batches in 30 seconds")
+
+        eventually(eventuallyTimeout) {
+          clock.advance(batchDuration.milliseconds)
+          // Verify whether data received was as expected
+          logInfo("--------------------------------")
+          logInfo("output.size = " + outputQueue.size)
+          logInfo("output")
+          outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+          logInfo("expected output.size = " + expectedOutput.size)
+          logInfo("expected output")
+          expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+          logInfo("--------------------------------")
+
+          // Verify whether all the elements received are as expected
+          // (whether the elements were received one in each interval is not verified)
+          val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+          assert(output.length === expectedOutput.size)
+          for (i <- output.indices) {
+            assert(output(i) === expectedOutput(i))
+          }
         }
 
-        // Ensure progress listener has been notified of all events
-        ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
-        // Verify all "InputInfo"s have been reported
-        assert(ssc.progressListener.numTotalReceivedRecords === input.size)
-        assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
-        logInfo("Stopping server")
-        testServer.stop()
-        logInfo("Stopping context")
-        ssc.stop()
-
-        // Verify whether data received was as expected
-        logInfo("--------------------------------")
-        logInfo("output.size = " + outputQueue.size)
-        logInfo("output")
-        outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("expected output.size = " + expectedOutput.size)
-        logInfo("expected output")
-        expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("--------------------------------")
-
-        // Verify whether all the elements received are as expected
-        // (whether the elements were received one in each interval is not verified)
-        val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-        assert(output.length === expectedOutput.size)
-        for (i <- output.indices) {
-          assert(output(i) === expectedOutput(i))
+        eventually(eventuallyTimeout) {
+          assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+          assert(ssc.progressListener.numTotalProcessedRecords === input.length)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org