You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/01 01:41:46 UTC

spark git commit: [SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly

Repository: spark
Updated Branches:
  refs/heads/master c4979f6ea -> 0a811210f


[SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16091 from zsxwing/SPARK-18617-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a811210
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a811210
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a811210

Branch: refs/heads/master
Commit: 0a811210f809eb5b80eae14694d484d45b48b3f6
Parents: c4979f6
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Nov 30 17:41:43 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Nov 30 17:41:43 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/StreamingContextSuite.scala | 34 +++++---------------
 1 file changed, 8 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a811210/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 45d8f50..35eeb9d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.{File, NotSerializableException}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.ArrayBuffer
@@ -811,7 +812,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
     // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
     val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
     ssc = new StreamingContext(conf, Milliseconds(100))
-    val input = ssc.receiverStream(new FakeByteArrayReceiver)
+    val input = ssc.receiverStream(new TestReceiver)
+    val latch = new CountDownLatch(1)
     input.count().foreachRDD { rdd =>
       // Make sure we can read from BlockRDD
       if (rdd.collect().headOption.getOrElse(0L) > 0) {
@@ -820,12 +822,17 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
           setDaemon(true)
           override def run(): Unit = {
             ssc.stop(stopSparkContext = true, stopGracefully = false)
+            latch.countDown()
           }
         }.start()
       }
     }
     ssc.start()
     ssc.awaitTerminationOrTimeout(60000)
+    // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active
+    // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test
+    // is running.
+    assert(latch.await(60, TimeUnit.SECONDS))
   }
 
   def addInputStream(s: StreamingContext): DStream[Int] = {
@@ -891,31 +898,6 @@ object TestReceiver {
   val counter = new AtomicInteger(1)
 }
 
-class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
-
-  val data: Array[Byte] = "test".getBytes
-  var receivingThreadOption: Option[Thread] = None
-
-  override def onStart(): Unit = {
-    val thread = new Thread() {
-      override def run() {
-        logInfo("Receiving started")
-        while (!isStopped) {
-          store(data)
-        }
-        logInfo("Receiving stopped")
-      }
-    }
-    receivingThreadOption = Some(thread)
-    thread.start()
-  }
-
-  override def onStop(): Unit = {
-    // no clean to be done, the receiving thread should stop on it own, so just wait for it.
-    receivingThreadOption.foreach(_.join())
-  }
-}
-
 /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
 class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org