You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:48 UTC

[10/33] git commit: Fix CheckpointSuite test failures

Fix CheckpointSuite test failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a8f31638
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a8f31638
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a8f31638

Branch: refs/heads/master
Commit: a8f316386a429c6d73e3e3824ea6eb28b0381cb5
Parents: 578bd1f
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sat Dec 28 21:26:43 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Dec 28 21:26:43 2013 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/CheckpointSuite.scala    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8f31638/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 315bd54..2a41ec0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -42,8 +42,6 @@ import org.apache.spark.streaming.util.ManualClock
  */
 class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
-  conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-
   before {
     FileUtils.deleteDirectory(new File(checkpointDir))
   }
@@ -135,13 +133,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Restart stream computation from the new checkpoint file to see whether that file has
     // correct checkpoint data
-    conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
     ssc = new StreamingContext(checkpointDir)
     stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
     assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
 
-    // Adjust manual clock time as if it is being restarted after a delay
+    // Adjust manual clock time as if it is being restarted after a delay; this is a hack because
+    // we modify the conf object, but it works for this one property
+    ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
     ssc.start()
     advanceTimeWithRealDelay(ssc, 4)
     ssc.stop()