You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/09/14 21:36:12 UTC

[spark] branch branch-2.4 updated: [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 637a6c2  [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout
637a6c2 is described below

commit 637a6c2750be8d4f42b1fd11c4cca8d0067e80d8
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Wed Sep 11 13:31:43 2019 -0700

    [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout
    
    ### What changes were proposed in this pull request?
    
    This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases.
    
    ### Why are the changes needed?
    
    We got some reports for test failure on this test. Please refer [SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663)
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modified UT. I've artificially made delay on handling job submission via adding below code in `DAGScheduler.submitJob`:
    
    ```
    if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) {
      println(s"Receiver Job! rdd name: ${rdd.name}")
      Thread.sleep(1000)
    }
    ```
    
    and the test "stop slow receiver gracefully" failed on current master and passed on the patch.
    
    Closes #25725 from HeartSaVioR/SPARK-24663.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../org/apache/spark/streaming/StreamingContextSuite.scala   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 7e1b411..be39168 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -355,7 +355,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     logInfo("==================================\n\n\n")
     ssc = new StreamingContext(sc, Milliseconds(100))
     var runningCount = 0
-    SlowTestReceiver.receivedAllRecords = false
     // Create test receiver that sleeps in onStop()
     val totalNumRecords = 15
     val recordsPerSecond = 1
@@ -367,6 +366,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     }
     ssc.start()
     ssc.awaitTerminationOrTimeout(500)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      assert(SlowTestReceiver.initialized)
+    }
     ssc.stop(stopSparkContext = false, stopGracefully = true)
     logInfo("Running count = " + runningCount)
     assert(runningCount > 0)
@@ -958,6 +960,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
 
   var receivingThreadOption: Option[Thread] = None
+  @volatile var receivedAllRecords = false
 
   def onStart() {
     val thread = new Thread() {
@@ -967,17 +970,18 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
           Thread.sleep(1000 / recordsPerSecond)
           store(i)
         }
-        SlowTestReceiver.receivedAllRecords = true
+        receivedAllRecords = true
         logInfo(s"Received all $totalRecords records")
       }
     }
     receivingThreadOption = Some(thread)
     thread.start()
+    SlowTestReceiver.initialized = true
   }
 
   def onStop() {
     // Simulate slow receiver by waiting for all records to be produced
-    while (!SlowTestReceiver.receivedAllRecords) {
+    while (!receivedAllRecords) {
       Thread.sleep(100)
     }
     // no clean to be done, the receiving thread should stop on it own
@@ -985,7 +989,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
 }
 
 object SlowTestReceiver {
-  var receivedAllRecords = false
+  var initialized = false
 }
 
 /** Streaming application for testing DStream and RDD creation sites */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org