You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/17 00:41:46 UTC

spark git commit: [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage (branch-1.2 backport)

Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f9d8c5e3f -> 7f19c7c1b


[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage (branch-1.2 backport)

(This PR backports #3801 into `branch-1.2` (1.2.2))

This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600).

Key changes:

- Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock.
- Fix a synchronization issue in ManualClock's `currentTime` method.
- Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished.
- Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls.
- Update these tests to use the withStreamingContext fixture.

Author: Josh Rosen <jo...@databricks.com>

Closes #4633 from JoshRosen/spark-1600-b12-backport and squashes the following commits:

e5d3dc4 [Josh Rosen] [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f19c7c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f19c7c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f19c7c1

Branch: refs/heads/branch-1.2
Commit: 7f19c7c1b4d1942df68c08266c640e9604a50bde
Parents: f9d8c5e
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Feb 16 15:41:38 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 16 15:41:38 2015 -0800

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    |  16 +-
 .../org/apache/spark/streaming/util/Clock.scala |   6 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   2 +-
 .../spark/streaming/CheckpointSuite.scala       | 248 +++++++++++--------
 .../spark/streaming/InputStreamsSuite.scala     |  69 +++---
 .../apache/spark/streaming/TestSuiteBase.scala  |  46 +++-
 6 files changed, 251 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc..e7c5639 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     newFilesOnly: Boolean = true)
   extends InputDStream[(K, V)](ssc_) {
 
+  // This is a def so that it works during checkpoint recovery:
+  private def clock = ssc.scheduler.clock
+
   // Data to be saved as part of the streaming checkpoints
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
   // Initial ignore threshold based on which old, existing files in the directory (at the time of
   // starting the streaming application) will be ignored or considered
-  private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+  private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
 
   /*
    * Make sure that the information of files selected in the last few batches are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   remember(durationToRemember)
 
   // Map of batch-time to selected file info for the remembered batches
+  // This is a concurrent map because it's also accessed in unit tests
   @transient private[streaming] var batchTimeToSelectedFiles =
-    new mutable.HashMap[Time, Array[String]]
+    new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
 
   // Set of files that were selected in the remembered batches
   @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    */
   private def findNewFiles(currentTime: Long): Array[String] = {
     try {
-      lastNewFileFindingTime = System.currentTimeMillis
+      lastNewFileFindingTime = clock.currentTime()
 
       // Calculate ignore threshold
       val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
       }
       val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
-      val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+      val timeTaken = clock.currentTime() - lastNewFileFindingTime
       logInfo("Finding new files took " + timeTaken + " ms")
       logDebug("# cached file times = " + fileToModTime.size)
       if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
-    batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+    batchTimeToSelectedFiles =
+      new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()
     fileToModTime = new TimeStampedHashMap[String, Long](true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867c..d6d96d7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
 private[streaming]
 class ManualClock() extends Clock {
 
-  var time = 0L
+  private var time = 0L
 
-  def currentTime() = time
+  def currentTime() = this.synchronized {
+    time
+  }
 
   def setTime(timeToSet: Long) = {
     this.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f4a269c..9e08f1e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -565,7 +565,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       if (rememberDuration != null) ssc.remember(rememberDuration)
       val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      assert(clock.time === Seconds(10).milliseconds)
+      assert(clock.currentTime() === Seconds(10).milliseconds)
       assert(output.size === numExpectedOutput)
       operatedStream
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c9f6ddc..e685a17 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,17 +18,18 @@
 package org.apache.spark.streaming
 
 import java.io.File
-import java.nio.charset.Charset
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import scala.reflect.ClassTag
 
+import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.StreamingContext._
@@ -47,8 +48,6 @@ class CheckpointSuite extends TestSuiteBase {
 
   override def batchDuration = Milliseconds(500)
 
-  override def actuallyWait = true // to allow checkpoints to be written
-
   override def beforeFunction() {
     super.beforeFunction()
     Utils.deleteRecursively(new File(checkpointDir))
@@ -145,7 +144,6 @@ class CheckpointSuite extends TestSuiteBase {
     ssc.start()
     advanceTimeWithRealDelay(ssc, 4)
     ssc.stop()
-    System.clearProperty("spark.streaming.manualClock.jump")
     ssc = null
   }
 
@@ -314,109 +312,161 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
-
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window operation.
   // It also tests whether batches, whose processing was incomplete due to the
   // failure, are re-processed or not.
   test("recovery with file input stream") {
     // Set up the streaming context and input streams
+    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
     val testDir = Utils.createTempDir()
-    var ssc = new StreamingContext(master, framework, Seconds(1))
-    ssc.checkpoint(checkpointDir)
-    val fileStream = ssc.textFileStream(testDir.toString)
-    // Making value 3 take large time to process, to ensure that the master
-    // shuts down in the middle of processing the 3rd batch
-    val mappedStream = fileStream.map(s => {
-      val i = s.toInt
-      if (i == 3) Thread.sleep(2000)
-      i
-    })
-
-    // Reducing over a large window to ensure that recovery from master failure
-    // requires reprocessing of all the files seen before the failure
-    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
-    val outputBuffer = new ArrayBuffer[Seq[Int]]
-    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-    outputStream.register()
-    ssc.start()
-
-    // Create files and advance manual clock to process them
-    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    Thread.sleep(1000)
-    for (i <- Seq(1, 2, 3)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      // wait to make sure that the file is written such that it gets shown in the file listings
-      Thread.sleep(1000)
+    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+
+    /**
+     * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
+     * modification time to `clock`'s current time.
+     */
+    def writeFile(i: Int, clock: ManualClock): Unit = {
+      val file = new File(testDir, i.toString)
+      Files.write(i + "\n", file, Charsets.UTF_8)
+      assert(file.setLastModified(clock.currentTime()))
+      // Check that the file's modification date is actually the value we wrote, since rounding or
+      // truncation will break the test:
+      assert(file.lastModified() === clock.currentTime())
     }
-    logInfo("Output = " + outputStream.output.mkString(","))
-    assert(outputStream.output.size > 0, "No files processed before restart")
-    ssc.stop()
 
-    // Verify whether files created have been recorded correctly or not
-    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-    // Create files while the master is down
-    for (i <- Seq(4, 5, 6)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      Thread.sleep(1000)
+    /**
+     * Returns ids that identify which files which have been recorded by the file input stream.
+     */
+    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+      val fileInputDStream =
+        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
     }
 
-    // Recover context from checkpoint file and verify whether the files that were
-    // recorded before failure were saved and successfully recovered
-    logInfo("*********** RESTARTING ************")
-    ssc = new StreamingContext(checkpointDir)
-    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+    try {
+      // This is a var because it's re-assigned when we restart from a checkpoint
+      var clock: ManualClock = null
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        ssc.checkpoint(checkpointDir)
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.textFileStream(testDir.toString)
+        // Make value 3 take a large time to process, to ensure that the driver
+        // shuts down in the middle of processing the 3rd batch
+        CheckpointSuite.batchThreeShouldBlockIndefinitely = true
+        val mappedStream = fileStream.map(s => {
+          val i = s.toInt
+          if (i == 3) {
+            while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
+              Thread.sleep(Long.MaxValue)
+            }
+          }
+          i
+        })
+
+        // Reducing over a large window to ensure that recovery from driver failure
+        // requires reprocessing of all the files seen before the failure
+        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance half a batch so that the first file is created after the StreamingContext starts
+        clock.addToTime(batchDuration.milliseconds / 2)
+        // Create files and advance manual clock to process them
+        for (i <- Seq(1, 2, 3)) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          if (i != 3) {
+            // Since we want to shut down while the 3rd batch is processing
+            eventually(eventuallyTimeout) {
+              assert(batchCounter.getNumCompletedBatches === i)
+            }
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        eventually(eventuallyTimeout) {
+          // Wait until all files have been recorded and all batches have started
+          assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
+        }
+        // Wait for a checkpoint to be written
+        val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
+        eventually(eventuallyTimeout) {
+          assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+        }
+        ssc.stop()
+        // Check that we shut down while the third batch was being processed
+        assert(batchCounter.getNumCompletedBatches === 2)
+        assert(outputStream.output.flatten === Seq(1, 3))
+      }
 
-    // Restart stream computation
-    ssc.start()
-    for (i <- Seq(7, 8, 9)) {
-      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
-      Thread.sleep(1000)
-    }
-    Thread.sleep(1000)
-    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
-    assert(outputStream.output.size > 0, "No files processed after restart")
-    ssc.stop()
+      // The original StreamingContext has now been stopped.
+      CheckpointSuite.batchThreeShouldBlockIndefinitely = false
 
-    // Verify whether files created while the driver was down have been recorded or not
-    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
-
-    // Verify whether new files created after recover have been recorded or not
-    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
-
-    // Append the new output to the old buffer
-    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
-    outputBuffer ++= outputStream.output
-
-    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
-    logInfo("--------------------------------")
-    logInfo("output, size = " + outputBuffer.size)
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("expected output, size = " + expectedOutput.size)
-    expectedOutput.foreach(x => logInfo("[" + x + "]"))
-    logInfo("--------------------------------")
-
-    // Verify whether all the elements received are as expected
-    val output = outputBuffer.flatMap(x => x)
-    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
-    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
-      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
-    )
-    // To ensure that all the inputs were received correctly
-    assert(expectedOutput.last === output.last)
-    Utils.deleteRecursively(testDir)
+      // Create files while the streaming driver is down
+      for (i <- Seq(4, 5, 6)) {
+        writeFile(i, clock)
+        // Advance the clock after creating the file to avoid a race when
+        // setting its modification time
+        clock.addToTime(batchDuration.milliseconds)
+      }
+
+      // Recover context from checkpoint file and verify whether the files that were
+      // recorded before failure were saved and successfully recovered
+      logInfo("*********** RESTARTING ************")
+      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+        // So that the restarted StreamingContext's clock has gone forward in time since failure
+        ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
+        val oldClockTime = clock.currentTime()
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+        // Check that we remember files that were recorded before the restart
+        assert(recordedFiles(ssc) === Seq(1, 2, 3))
+
+        // Restart stream computation
+        ssc.start()
+        // Verify that the clock has traveled forward to the expected time
+        eventually(eventuallyTimeout) {
+          clock.currentTime() === oldClockTime
+        }
+        // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
+        val numBatchesAfterRestart = 4
+        eventually(eventuallyTimeout) {
+          assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
+        }
+        for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+        assert(outputStream.output.size > 0, "No files processed after restart")
+        ssc.stop()
+
+        // Verify whether files created while the driver was down (4, 5, 6) and files created after
+        // recovery (7, 8, 9) have been recorded
+        assert(recordedFiles(ssc) === (1 to 9))
+
+        // Append the new output to the old buffer
+        outputBuffer ++= outputStream.output
+
+        // Verify whether all the elements received are as expected
+        val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+        assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+      }
+    } finally {
+      Utils.deleteRecursively(testDir)
+    }
   }
 
 
@@ -473,12 +523,12 @@ class CheckpointSuite extends TestSuiteBase {
    */
   def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    logInfo("Manual clock before advancing = " + clock.time)
+    logInfo("Manual clock before advancing = " + clock.currentTime())
     for (i <- 1 to numBatches.toInt) {
       clock.addToTime(batchDuration.milliseconds)
       Thread.sleep(batchDuration.milliseconds)
     }
-    logInfo("Manual clock after advancing = " + clock.time)
+    logInfo("Manual clock after advancing = " + clock.currentTime())
     Thread.sleep(batchDuration.milliseconds)
 
     val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -487,3 +537,7 @@ class CheckpointSuite extends TestSuiteBase {
     outputStream.output.map(_.flatten)
   }
 }
+
+private object CheckpointSuite extends Serializable {
+  var batchThreeShouldBlockIndefinitely: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a..bddf51e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
-import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.Files
@@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
   def testFileStream(newFilesOnly: Boolean) {
-    var ssc: StreamingContext = null
     val testDir: File = null
     try {
+      val batchDuration = Seconds(2)
       val testDir = Utils.createTempDir()
+      // Create a file that exists before the StreamingContext is created:
       val existingFile = new File(testDir, "0")
       Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
 
-      Thread.sleep(1000)
       // Set up the streaming context and input streams
-      val newConf = conf.clone.set(
-        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-      ssc = new StreamingContext(newConf, batchDuration)
-      val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
-        testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
-      val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-      val outputStream = new TestOutputStream(fileStream, outputBuffer)
-      outputStream.register()
-      ssc.start()
-
-      // Create files in the directory
-      val input = Seq(1, 2, 3, 4, 5)
-      input.foreach { i =>
-        Thread.sleep(batchDuration.milliseconds)
-        val file = new File(testDir, i.toString)
-        Files.write(i + "\n", file, Charset.forName("UTF-8"))
-        logInfo("Created file " + file)
-      }
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+          testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+        val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.addToTime(batchDuration.milliseconds / 2)
+
+        // Over time, create files in the directory
+        val input = Seq(1, 2, 3, 4, 5)
+        input.foreach { i =>
+          val file = new File(testDir, i.toString)
+          Files.write(i + "\n", file, Charset.forName("UTF-8"))
+          assert(file.setLastModified(clock.currentTime()))
+          assert(file.lastModified === clock.currentTime)
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === i)
+          }
+        }
 
-      // Verify that all the files have been read
-      val expectedOutput = if (newFilesOnly) {
-        input.map(_.toString).toSet
-      } else {
-        (Seq(0) ++ input).map(_.toString).toSet
-      }
-      eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+        // Verify that all the files have been read
+        val expectedOutput = if (newFilesOnly) {
+          input.map(_.toString).toSet
+        } else {
+          (Seq(0) ++ input).map(_.toString).toSet
+        }
         assert(outputBuffer.flatten.toSet === expectedOutput)
       }
     } finally {
-      if (ssc != null) ssc.stop()
       if (testDir != null) Utils.deleteRecursively(testDir)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f6..7d82c3e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.SynchronizedBuffer
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
+import org.scalatest.concurrent.Eventually.timeout
+import org.scalatest.concurrent.PatienceConfiguration
 
 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.rdd.RDD
@@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
 }
 
 /**
+ * An object that counts the number of started / completed batches. This is implemented using a
+ * StreamingListener. Constructing a new instance automatically registers a StreamingListener on
+ * the given StreamingContext.
+ */
+class BatchCounter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by `BatchCounter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
+      BatchCounter.this.synchronized {
+        numStartedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
+      BatchCounter.this.synchronized {
+        numCompletedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+    numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+    numStartedBatches
+  }
+}
+
+/**
  * This is the base trait for Spark Streaming testsuites. This provides basic functionality
  * to run user-defined set of input on user-defined stream operations, and verify the output.
  */
@@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     .setMaster(master)
     .setAppName(framework)
 
+  // Timeout for use in ScalaTest `eventually` blocks
+  val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
+
   // Default before function for any streaming test suite. Override this
   // if you want to add your stuff to "before" (i.e., don't call before { } )
   def beforeFunction() {
@@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
       // Advance manual clock
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      logInfo("Manual clock before advancing = " + clock.time)
+      logInfo("Manual clock before advancing = " + clock.currentTime())
       if (actuallyWait) {
         for (i <- 1 to numBatches) {
           logInfo("Actually waiting for " + batchDuration)
@@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
       } else {
         clock.addToTime(numBatches * batchDuration.milliseconds)
       }
-      logInfo("Manual clock after advancing = " + clock.time)
+      logInfo("Manual clock after advancing = " + clock.currentTime())
 
       // Wait until expected number of output items have been generated
       val startTime = System.currentTimeMillis()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org