You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/09/23 10:29:36 UTC
spark git commit: [SPARK-10769] [STREAMING] [TESTS] Fix
o.a.s.streaming.CheckpointSuite.maintains rate controller
Repository: spark
Updated Branches:
refs/heads/master 44c28abf1 -> 50e463423
[SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite.maintains rate controller
Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
```
In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.
Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)
15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749
15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.
So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.
Author: zsxwing <zs...@gmail.com>
Closes #8877 from zsxwing/SPARK-10769.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50e46342
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50e46342
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50e46342
Branch: refs/heads/master
Commit: 50e4634236668a0195390f0080d0ac230d428d05
Parents: 44c28ab
Author: zsxwing <zs...@gmail.com>
Authored: Wed Sep 23 01:29:30 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Sep 23 01:29:30 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/streaming/CheckpointSuite.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/50e46342/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1bba7a1..a695653 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -408,10 +408,14 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(checkpointDir)
ssc.start()
- val outputNew = advanceTimeWithRealDelay(ssc, 2)
eventually(timeout(10.seconds)) {
assert(RateTestReceiver.getActive().nonEmpty)
+ }
+
+ advanceTimeWithRealDelay(ssc, 2)
+
+ eventually(timeout(10.seconds)) {
assert(RateTestReceiver.getActive().get.getDefaultBlockGeneratorRateLimit() === 200)
}
ssc.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org