You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/09/23 10:29:36 UTC

spark git commit: [SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite.maintains rate controller

Repository: spark
Updated Branches:
  refs/heads/master 44c28abf1 -> 50e463423


[SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite.maintains rate controller

Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zs...@gmail.com>

Closes #8877 from zsxwing/SPARK-10769.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50e46342
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50e46342
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50e46342

Branch: refs/heads/master
Commit: 50e4634236668a0195390f0080d0ac230d428d05
Parents: 44c28ab
Author: zsxwing <zs...@gmail.com>
Authored: Wed Sep 23 01:29:30 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Sep 23 01:29:30 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/CheckpointSuite.scala     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50e46342/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1bba7a1..a695653 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -408,10 +408,14 @@ class CheckpointSuite extends TestSuiteBase {
 
     ssc = new StreamingContext(checkpointDir)
     ssc.start()
-    val outputNew = advanceTimeWithRealDelay(ssc, 2)
 
     eventually(timeout(10.seconds)) {
       assert(RateTestReceiver.getActive().nonEmpty)
+    }
+
+    advanceTimeWithRealDelay(ssc, 2)
+
+    eventually(timeout(10.seconds)) {
       assert(RateTestReceiver.getActive().get.getDefaultBlockGeneratorRateLimit() === 200)
     }
     ssc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org