You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/09 22:22:21 UTC

spark git commit: [SPARK-7419] [STREAMING] [TESTS] Fix CheckpointSuite.recovery with file input stream

Repository: spark
Updated Branches:
  refs/heads/master 930fe9535 -> 88bf43033


[SPARK-7419] [STREAMING] [TESTS] Fix CheckpointSuite.recovery with file input stream

Fix this failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/2886/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.3,label=centos/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_with_file_input_stream/

To reproduce this failure, you can add `Thread.sleep(2000)` before this line
https://github.com/apache/spark/blob/a9c4e29950a14e32acaac547e9a0e8879fd37fc9/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala#L477

Author: zsxwing <zs...@gmail.com>

Closes #7323 from zsxwing/SPARK-7419 and squashes the following commits:

b3caf58 [zsxwing] Fix CheckpointSuite.recovery with file input stream


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bf4303
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bf4303
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bf4303

Branch: refs/heads/master
Commit: 88bf430331eef3c02438ca441616034486e15789
Parents: 930fe95
Author: zsxwing <zs...@gmail.com>
Authored: Thu Jul 9 13:22:17 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Jul 9 13:22:17 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/CheckpointSuite.scala  | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88bf4303/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6b0a3f9..6a94928 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -424,11 +424,11 @@ class CheckpointSuite extends TestSuiteBase {
             }
           }
         }
-        clock.advance(batchDuration.milliseconds)
         eventually(eventuallyTimeout) {
           // Wait until all files have been recorded and all batches have started
           assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
         }
+        clock.advance(batchDuration.milliseconds)
         // Wait for a checkpoint to be written
         eventually(eventuallyTimeout) {
           assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6)
@@ -454,9 +454,12 @@ class CheckpointSuite extends TestSuiteBase {
       // recorded before failure were saved and successfully recovered
       logInfo("*********** RESTARTING ************")
       withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
-        // So that the restarted StreamingContext's clock has gone forward in time since failure
-        ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
-        val oldClockTime = clock.getTimeMillis()
+        // "batchDuration.milliseconds * 3" has gone before restarting StreamingContext. And because
+        // the recovery time is read from the checkpoint time but the original clock doesn't align
+        // with the batch time, we need to add the offset "batchDuration.milliseconds / 2".
+        ssc.conf.set("spark.streaming.manualClock.jump",
+          (batchDuration.milliseconds / 2 + batchDuration.milliseconds * 3).toString)
+        val oldClockTime = clock.getTimeMillis() // 15000ms
         clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
         val batchCounter = new BatchCounter(ssc)
         val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -467,10 +470,10 @@ class CheckpointSuite extends TestSuiteBase {
         ssc.start()
         // Verify that the clock has traveled forward to the expected time
         eventually(eventuallyTimeout) {
-          clock.getTimeMillis() === oldClockTime
+          assert(clock.getTimeMillis() === oldClockTime)
         }
-        // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
-        val numBatchesAfterRestart = 4
+        // There are 5 batches between 6000ms and 15000ms (inclusive).
+        val numBatchesAfterRestart = 5
         eventually(eventuallyTimeout) {
           assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
         }
@@ -483,7 +486,6 @@ class CheckpointSuite extends TestSuiteBase {
             assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
           }
         }
-        clock.advance(batchDuration.milliseconds)
         logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
         assert(outputStream.output.size > 0, "No files processed after restart")
         ssc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org