You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/06 09:34:24 UTC

spark git commit: [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage

Repository: spark
Updated Branches:
  refs/heads/master 451546aa6 -> a6394bc2c


[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage

This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600).

Key changes:

- Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock.
- Fix a synchronization issue in ManualClock's `currentTime` method.
- Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished.
- Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls.
- Update these tests to use the withStreamingContext fixture.

Author: Josh Rosen <jo...@databricks.com>

Closes #3801 from JoshRosen/SPARK-1600 and squashes the following commits:

e4494f4 [Josh Rosen] Address a potential race when setting file modification times
8340bd0 [Josh Rosen] Use set comparisons for output.
0b9c252 [Josh Rosen] Fix some ManualClock usage problems.
1cc689f [Josh Rosen] ConcurrentHashMap -> SynchronizedMap
db26c3a [Josh Rosen] Use standard timeout in ScalaTest `eventually` blocks.
3939432 [Josh Rosen] Rename StreamingTestWaiter to BatchCounter
0b9c3a1 [Josh Rosen] Wait for checkpoint to complete
863d71a [Josh Rosen] Remove Thread.sleep that was used to make task run slowly
b4442c3 [Josh Rosen] batchTimeToSelectedFiles should be thread-safe
15b48ee [Josh Rosen] Replace several TestWaiter methods w/ ScalaTest eventually.
fffc51c [Josh Rosen] Revert "Remove last remaining sleep() call"
dbb8247 [Josh Rosen] Remove last remaining sleep() call
566a63f [Josh Rosen] Fix log message and comment typos
da32f3f [Josh Rosen] Fix log message and comment typos
3689214 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-1600
c8f06b1 [Josh Rosen] Remove Thread.sleep calls in FileInputStream CheckpointSuite test.
d4f2d87 [Josh Rosen] Refactor file input stream tests to not rely on SystemClock.
dda1403 [Josh Rosen] Add StreamingTestWaiter class.
3c3efc3 [Josh Rosen] Synchronize `currentTime` in ManualClock
a95ddc4 [Josh Rosen] Modify FileInputDStream to use Clock class.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6394bc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6394bc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6394bc2

Branch: refs/heads/master
Commit: a6394bc2c094c6c662237236c2effa2dabe67910
Parents: 451546a
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Jan 6 00:31:19 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jan 6 00:31:19 2015 -0800

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    |  16 +-
 .../org/apache/spark/streaming/util/Clock.scala |   6 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   2 +-
 .../spark/streaming/CheckpointSuite.scala       | 248 +++++++++++--------
 .../spark/streaming/InputStreamsSuite.scala     |  69 +++---
 .../apache/spark/streaming/TestSuiteBase.scala  |  46 +++-
 6 files changed, 251 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc..e7c5639 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     newFilesOnly: Boolean = true)
   extends InputDStream[(K, V)](ssc_) {
 
+  // This is a def so that it works during checkpoint recovery:
+  private def clock = ssc.scheduler.clock
+
   // Data to be saved as part of the streaming checkpoints
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
   // Initial ignore threshold based on which old, existing files in the directory (at the time of
   // starting the streaming application) will be ignored or considered
-  private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+  private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
 
   /*
    * Make sure that the information of files selected in the last few batches are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   remember(durationToRemember)
 
   // Map of batch-time to selected file info for the remembered batches
+  // This is a concurrent map because it's also accessed in unit tests
   @transient private[streaming] var batchTimeToSelectedFiles =
-    new mutable.HashMap[Time, Array[String]]
+    new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
 
   // Set of files that were selected in the remembered batches
   @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    */
   private def findNewFiles(currentTime: Long): Array[String] = {
     try {
-      lastNewFileFindingTime = System.currentTimeMillis
+      lastNewFileFindingTime = clock.currentTime()
 
       // Calculate ignore threshold
       val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
       }
       val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
-      val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+      val timeTaken = clock.currentTime() - lastNewFileFindingTime
       logInfo("Finding new files took " + timeTaken + " ms")
       logDebug("# cached file times = " + fileToModTime.size)
       if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
-    batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+    batchTimeToSelectedFiles =
+      new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()
     fileToModTime = new TimeStampedHashMap[String, Long](true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867c..d6d96d7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
 private[streaming]
 class ManualClock() extends Clock {
 
-  var time = 0L
+  private var time = 0L
 
-  def currentTime() = time
+  def currentTime() = this.synchronized {
+    time
+  }
 
   def setTime(timeToSet: Long) = {
     this.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 199f5e7..e8f4a77 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       if (rememberDuration != null) ssc.remember(rememberDuration)
       val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      assert(clock.time === Seconds(10).milliseconds)
+      assert(clock.currentTime() === Seconds(10).milliseconds)
       assert(output.size === numExpectedOutput)
       operatedStream
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 5d232c6..8f8bc61 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,17 +18,18 @@
 package org.apache.spark.streaming
 
 import java.io.File
-import java.nio.charset.Charset
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import scala.reflect.ClassTag
 
+import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
@@ -45,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
 
   override def batchDuration = Milliseconds(500)
 
-  override def actuallyWait = true // to allow checkpoints to be written
-
   override def beforeFunction() {
     super.beforeFunction()
     Utils.deleteRecursively(new File(checkpointDir))
@@ -143,7 +142,6 @@ class CheckpointSuite extends TestSuiteBase {
     ssc.start()
     advanceTimeWithRealDelay(ssc, 4)
     ssc.stop()
-    System.clearProperty("spark.streaming.manualClock.jump")
     ssc = null
   }
 
@@ -312,109 +310,161 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
-
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window operation.
   // It also tests whether batches, whose processing was incomplete due to the
   // failure, are re-processed or not.
   test("recovery with file input stream") {
     // Set up the streaming context and input streams
+    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
     val testDir = Utils.createTempDir()
-    var ssc = new StreamingContext(master, framework, Seconds(1))
-    ssc.checkpoint(checkpointDir)
-    val fileStream = ssc.textFileStream(testDir.toString)
-    // Making value 3 take large time to process, to ensure that the master
-    // shuts down in the middle of processing the 3rd batch
-    val mappedStream = fileStream.map(s => {
-      val i = s.toInt
-      if (i == 3) Thread.sleep(2000)
-      i
-    })
-
-    // Reducing over a large window to ensure that recovery from master failure
-    // requires reprocessing of all the files seen before the failure
-    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
-    val outputBuffer = new ArrayBuffer[Seq[Int]]
-    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-    outputStream.register()
-    ssc.start()
-
-    // Create files and advance manual clock to process them
-    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    Thread.sleep(1000)
-    for (i <- Seq(1, 2, 3)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      // wait to make sure that the file is written such that it gets shown in the file listings
-      Thread.sleep(1000)
+    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+
+    /**
+     * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
+     * modification time to `clock`'s current time.
+     */
+    def writeFile(i: Int, clock: ManualClock): Unit = {
+      val file = new File(testDir, i.toString)
+      Files.write(i + "\n", file, Charsets.UTF_8)
+      assert(file.setLastModified(clock.currentTime()))
+      // Check that the file's modification date is actually the value we wrote, since rounding or
+      // truncation will break the test:
+      assert(file.lastModified() === clock.currentTime())
     }
-    logInfo("Output = " + outputStream.output.mkString(","))
-    assert(outputStream.output.size > 0, "No files processed before restart")
-    ssc.stop()
 
-    // Verify whether files created have been recorded correctly or not
-    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-    // Create files while the master is down
-    for (i <- Seq(4, 5, 6)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      Thread.sleep(1000)
+    /**
+     * Returns ids that identify which files which have been recorded by the file input stream.
+     */
+    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+      val fileInputDStream =
+        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
     }
 
-    // Recover context from checkpoint file and verify whether the files that were
-    // recorded before failure were saved and successfully recovered
-    logInfo("*********** RESTARTING ************")
-    ssc = new StreamingContext(checkpointDir)
-    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+    try {
+      // This is a var because it's re-assigned when we restart from a checkpoint
+      var clock: ManualClock = null
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        ssc.checkpoint(checkpointDir)
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.textFileStream(testDir.toString)
+        // Make value 3 take a large time to process, to ensure that the driver
+        // shuts down in the middle of processing the 3rd batch
+        CheckpointSuite.batchThreeShouldBlockIndefinitely = true
+        val mappedStream = fileStream.map(s => {
+          val i = s.toInt
+          if (i == 3) {
+            while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
+              Thread.sleep(Long.MaxValue)
+            }
+          }
+          i
+        })
+
+        // Reducing over a large window to ensure that recovery from driver failure
+        // requires reprocessing of all the files seen before the failure
+        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance half a batch so that the first file is created after the StreamingContext starts
+        clock.addToTime(batchDuration.milliseconds / 2)
+        // Create files and advance manual clock to process them
+        for (i <- Seq(1, 2, 3)) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          if (i != 3) {
+            // Since we want to shut down while the 3rd batch is processing
+            eventually(eventuallyTimeout) {
+              assert(batchCounter.getNumCompletedBatches === i)
+            }
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        eventually(eventuallyTimeout) {
+          // Wait until all files have been recorded and all batches have started
+          assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
+        }
+        // Wait for a checkpoint to be written
+        val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
+        eventually(eventuallyTimeout) {
+          assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+        }
+        ssc.stop()
+        // Check that we shut down while the third batch was being processed
+        assert(batchCounter.getNumCompletedBatches === 2)
+        assert(outputStream.output.flatten === Seq(1, 3))
+      }
 
-    // Restart stream computation
-    ssc.start()
-    for (i <- Seq(7, 8, 9)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      Thread.sleep(1000)
-    }
-    Thread.sleep(1000)
-    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
-    assert(outputStream.output.size > 0, "No files processed after restart")
-    ssc.stop()
+      // The original StreamingContext has now been stopped.
+      CheckpointSuite.batchThreeShouldBlockIndefinitely = false
 
-    // Verify whether files created while the driver was down have been recorded or not
-    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
-
-    // Verify whether new files created after recover have been recorded or not
-    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
-
-    // Append the new output to the old buffer
-    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
-    outputBuffer ++= outputStream.output
-
-    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
-    logInfo("--------------------------------")
-    logInfo("output, size = " + outputBuffer.size)
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("expected output, size = " + expectedOutput.size)
-    expectedOutput.foreach(x => logInfo("[" + x + "]"))
-    logInfo("--------------------------------")
-
-    // Verify whether all the elements received are as expected
-    val output = outputBuffer.flatMap(x => x)
-    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
-    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
-      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
-    )
-    // To ensure that all the inputs were received correctly
-    assert(expectedOutput.last === output.last)
-    Utils.deleteRecursively(testDir)
+      // Create files while the streaming driver is down
+      for (i <- Seq(4, 5, 6)) {
+        writeFile(i, clock)
+        // Advance the clock after creating the file to avoid a race when
+        // setting its modification time
+        clock.addToTime(batchDuration.milliseconds)
+      }
+
+      // Recover context from checkpoint file and verify whether the files that were
+      // recorded before failure were saved and successfully recovered
+      logInfo("*********** RESTARTING ************")
+      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+        // So that the restarted StreamingContext's clock has gone forward in time since failure
+        ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
+        val oldClockTime = clock.currentTime()
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+        // Check that we remember files that were recorded before the restart
+        assert(recordedFiles(ssc) === Seq(1, 2, 3))
+
+        // Restart stream computation
+        ssc.start()
+        // Verify that the clock has traveled forward to the expected time
+        eventually(eventuallyTimeout) {
+          clock.currentTime() === oldClockTime
+        }
+        // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
+        val numBatchesAfterRestart = 4
+        eventually(eventuallyTimeout) {
+          assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
+        }
+        for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+        assert(outputStream.output.size > 0, "No files processed after restart")
+        ssc.stop()
+
+        // Verify whether files created while the driver was down (4, 5, 6) and files created after
+        // recovery (7, 8, 9) have been recorded
+        assert(recordedFiles(ssc) === (1 to 9))
+
+        // Append the new output to the old buffer
+        outputBuffer ++= outputStream.output
+
+        // Verify whether all the elements received are as expected
+        val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+        assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+      }
+    } finally {
+      Utils.deleteRecursively(testDir)
+    }
   }
 
 
@@ -471,12 +521,12 @@ class CheckpointSuite extends TestSuiteBase {
    */
   def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    logInfo("Manual clock before advancing = " + clock.time)
+    logInfo("Manual clock before advancing = " + clock.currentTime())
     for (i <- 1 to numBatches.toInt) {
       clock.addToTime(batchDuration.milliseconds)
       Thread.sleep(batchDuration.milliseconds)
     }
-    logInfo("Manual clock after advancing = " + clock.time)
+    logInfo("Manual clock after advancing = " + clock.currentTime())
     Thread.sleep(batchDuration.milliseconds)
 
     val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -485,3 +535,7 @@ class CheckpointSuite extends TestSuiteBase {
     outputStream.output.map(_.flatten)
   }
 }
+
+private object CheckpointSuite extends Serializable {
+  var batchThreeShouldBlockIndefinitely: Boolean = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a..bddf51e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
-import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.Files
@@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
   def testFileStream(newFilesOnly: Boolean) {
-    var ssc: StreamingContext = null
     val testDir: File = null
     try {
+      val batchDuration = Seconds(2)
       val testDir = Utils.createTempDir()
+      // Create a file that exists before the StreamingContext is created:
       val existingFile = new File(testDir, "0")
       Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
 
-      Thread.sleep(1000)
       // Set up the streaming context and input streams
-      val newConf = conf.clone.set(
-        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-      ssc = new StreamingContext(newConf, batchDuration)
-      val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
-        testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
-      val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-      val outputStream = new TestOutputStream(fileStream, outputBuffer)
-      outputStream.register()
-      ssc.start()
-
-      // Create files in the directory
-      val input = Seq(1, 2, 3, 4, 5)
-      input.foreach { i =>
-        Thread.sleep(batchDuration.milliseconds)
-        val file = new File(testDir, i.toString)
-        Files.write(i + "\n", file, Charset.forName("UTF-8"))
-        logInfo("Created file " + file)
-      }
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+          testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.addToTime(batchDuration.milliseconds / 2)
+
+        // Over time, create files in the directory
+        val input = Seq(1, 2, 3, 4, 5)
+        input.foreach { i =>
+          val file = new File(testDir, i.toString)
+          Files.write(i + "\n", file, Charset.forName("UTF-8"))
+          assert(file.setLastModified(clock.currentTime()))
+          assert(file.lastModified === clock.currentTime)
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === i)
+          }
+        }
 
-      // Verify that all the files have been read
-      val expectedOutput = if (newFilesOnly) {
-        input.map(_.toString).toSet
-      } else {
-        (Seq(0) ++ input).map(_.toString).toSet
-      }
-      eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+        // Verify that all the files have been read
+        val expectedOutput = if (newFilesOnly) {
+          input.map(_.toString).toSet
+        } else {
+          (Seq(0) ++ input).map(_.toString).toSet
+        }
         assert(outputBuffer.flatten.toSet === expectedOutput)
       }
     } finally {
-      if (ssc != null) ssc.stop()
       if (testDir != null) Utils.deleteRecursively(testDir)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f6..7d82c3e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.SynchronizedBuffer
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
+import org.scalatest.concurrent.Eventually.timeout
+import org.scalatest.concurrent.PatienceConfiguration
 
 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.rdd.RDD
@@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
 }
 
 /**
+ * An object that counts the number of started / completed batches. This is implemented using a
+ * StreamingListener. Constructing a new instance automatically registers a StreamingListener on
+ * the given StreamingContext.
+ */
+class BatchCounter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by `BatchCounter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
+      BatchCounter.this.synchronized {
+        numStartedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
+      BatchCounter.this.synchronized {
+        numCompletedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+    numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+    numStartedBatches
+  }
+}
+
+/**
  * This is the base trait for Spark Streaming testsuites. This provides basic functionality
  * to run user-defined set of input on user-defined stream operations, and verify the output.
  */
@@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     .setMaster(master)
     .setAppName(framework)
 
+  // Timeout for use in ScalaTest `eventually` blocks
+  val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
+
   // Default before function for any streaming test suite. Override this
   // if you want to add your stuff to "before" (i.e., don't call before { } )
   def beforeFunction() {
@@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
       // Advance manual clock
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      logInfo("Manual clock before advancing = " + clock.time)
+      logInfo("Manual clock before advancing = " + clock.currentTime())
       if (actuallyWait) {
         for (i <- 1 to numBatches) {
           logInfo("Actually waiting for " + batchDuration)
@@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
       } else {
         clock.addToTime(numBatches * batchDuration.milliseconds)
       }
-      logInfo("Manual clock after advancing = " + clock.time)
+      logInfo("Manual clock after advancing = " + clock.currentTime())
 
       // Wait until expected number of output items have been generated
       val startTime = System.currentTimeMillis()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org