You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/24 22:50:25 UTC

spark git commit: [SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files from being processed multiple times

Repository: spark
Updated Branches:
  refs/heads/master 4a90276ab -> cb0e9b098


[SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files from being processed multiple times

Because of a corner case, a file already selected for batch t can get considered again for batch t+2. This refactoring fixes it by remembering all the files selected in the last 1 minute, so that this corner case does not arise. Also uses spark context's hadoop configuration to access the file system API for listing directories.

pwendell Please take look. I still have not run long-running integration tests, so I cannot say for sure whether this has indeed solved the issue. You could do a first pass on this in the meantime.

Author: Tathagata Das <ta...@gmail.com>

Closes #3419 from tdas/filestream-fix2 and squashes the following commits:

c19dd8a [Tathagata Das] Addressed PR comments.
513b608 [Tathagata Das] Updated docs.
d364faf [Tathagata Das] Added the current time condition back
5526222 [Tathagata Das] Removed unnecessary imports.
38bb736 [Tathagata Das] Fix long line.
203bbc7 [Tathagata Das] Un-ignore tests.
eaef4e1 [Tathagata Das] Fixed SPARK-4519
9dbd40a [Tathagata Das] Refactored FileInputDStream to remember last few batches.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb0e9b09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb0e9b09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb0e9b09

Branch: refs/heads/master
Commit: cb0e9b0980f38befe88bf52aa037fe33262730f7
Parents: 4a90276
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Nov 24 13:50:20 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Nov 24 13:50:20 2014 -0800

----------------------------------------------------------------------
 .../spark/streaming/dstream/DStream.scala       |   2 +-
 .../streaming/dstream/FileInputDStream.scala    | 291 ++++++++++++-------
 .../spark/streaming/CheckpointSuite.scala       |   2 +-
 .../spark/streaming/InputStreamsSuite.scala     | 106 +++----
 4 files changed, 245 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb0e9b09/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index eabd61d..dbf1ebb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -254,7 +254,7 @@ abstract class DStream[T: ClassTag] (
   }
 
   private[streaming] def remember(duration: Duration) {
-    if (duration != null && duration > rememberDuration) {
+    if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
       rememberDuration = duration
       logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb0e9b09/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 55d6cf6..5f13fdc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,55 @@
 
 package org.apache.spark.streaming.dstream
 
-import java.io.{ObjectInputStream, IOException}
-import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{IOException, ObjectInputStream}
+
+import scala.collection.mutable
 import scala.reflect.ClassTag
+
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
 
+import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.streaming._
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
 
+/**
+ * This class represents an input stream that monitors a Hadoop-compatible filesystem for new
+ * files and creates a stream out of them. The way it works as follows.
+ *
+ * At each batch interval, the file system is queried for files in the given directory and
+ * detected new files are selected for that batch. In this case "new" means files that
+ * became visible to readers during that time period. Some extra care is needed to deal
+ * with the fact that files may become visible after they are created. For this purpose, this
+ * class remembers the information about the files selected in past batches for
+ * a certain duration (say, "remember window") as shown in the figure below.
+ *
+ *                      |<----- remember window ----->|
+ * ignore threshold --->|                             |<--- current batch time
+ *                      |____.____.____.____.____.____|
+ *                      |    |    |    |    |    |    |
+ * ---------------------|----|----|----|----|----|----|-----------------------> Time
+ *                      |____|____|____|____|____|____|
+ *                             remembered batches
+ *
+ * The trailing end of the window is the "ignore threshold" and all files whose mod times
+ * are less than this threshold are assumed to have already been selected and are therefore
+ * ignored. Files whose mod times are within the "remember window" are checked against files
+ * that have already been selected. At a high level, this is how new files are identified in
+ * each batch - files whose mod times are greater than the ignore threshold and
+ * have not been considered within the remember window. See the documentation on the method
+ * `isNewFile` for more details.
+ *
+ * This makes some assumptions from the underlying file system that the system is monitoring.
+ * - The clock of the file system is assumed to synchronized with the clock of the machine running
+ *   the streaming app.
+ * - If a file is to be visible in the directory listings, it must be visible within a certain
+ *   duration of the mod time of the file. This duration is the "remember window", which is set to
+ *   1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
+ *   selected as the mod time will be less than the ignore threshold when it becomes visible.
+ * - Once a file is visible, the mod time cannot change. If it does due to appends, then the
+ *   processing semantics are undefined.
+ */
 private[streaming]
 class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
     @transient ssc_ : StreamingContext,
@@ -37,22 +74,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     newFilesOnly: Boolean = true)
   extends InputDStream[(K, V)](ssc_) {
 
+  // Data to be saved as part of the streaming checkpoints
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
-  // files found in the last interval
-  private val lastFoundFiles = new HashSet[String]
+  // Initial ignore threshold based on which old, existing files in the directory (at the time of
+  // starting the streaming application) will be ignored or considered
+  private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+
+  /*
+   * Make sure that the information of files selected in the last few batches are remembered.
+   * This would allow us to filter away not-too-old files which have already been recently
+   * selected and processed.
+   */
+  private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
+  private val durationToRemember = slideDuration * numBatchesToRemember
+  remember(durationToRemember)
 
-  // Files with mod time earlier than this is ignored. This is updated every interval
-  // such that in the current interval, files older than any file found in the
-  // previous interval will be ignored. Obviously this time keeps moving forward.
-  private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
+  // Map of batch-time to selected file info for the remembered batches
+  @transient private[streaming] var batchTimeToSelectedFiles =
+    new mutable.HashMap[Time, Array[String]]
+
+  // Set of files that were selected in the remembered batches
+  @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
+
+  // Read-through cache of file mod times, used to speed up mod time lookups
+  @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
+
+  // Timestamp of the last round of finding files
+  @transient private var lastNewFileFindingTime = 0L
 
-  // Latest file mod time seen till any point of time
   @transient private var path_ : Path = null
   @transient private var fs_ : FileSystem = null
-  @transient private[streaming] var files = new HashMap[Time, Array[String]]
-  @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
-  @transient private var lastNewFileFindingTime = 0L
 
   override def start() { }
 
@@ -68,54 +120,113 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * the previous call.
    */
   override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-    assert(validTime.milliseconds >= ignoreTime,
-      "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
-
     // Find new files
-    val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
+    val newFiles = findNewFiles(validTime.milliseconds)
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
-    if (!newFiles.isEmpty) {
-      lastFoundFiles.clear()
-      lastFoundFiles ++= newFiles
-      ignoreTime = minNewFileModTime
-    }
-    files += ((validTime, newFiles.toArray))
+    batchTimeToSelectedFiles += ((validTime, newFiles))
+    recentlySelectedFiles ++= newFiles
     Some(filesToRDD(newFiles))
   }
 
   /** Clear the old time-to-files mappings along with old RDDs */
   protected[streaming] override def clearMetadata(time: Time) {
     super.clearMetadata(time)
-    val oldFiles = files.filter(_._1 < (time - rememberDuration))
-    files --= oldFiles.keys
+    val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
+    batchTimeToSelectedFiles --= oldFiles.keys
+    recentlySelectedFiles --= oldFiles.values.flatten
     logInfo("Cleared " + oldFiles.size + " old files that were older than " +
       (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
     logDebug("Cleared files are:\n" +
       oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
     // Delete file mod times that weren't accessed in the last round of getting new files
-    fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
+    fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
   }
 
   /**
-   * Find files which have modification timestamp <= current time and return a 3-tuple of
-   * (new files found, latest modification time among them, files with latest modification time)
+   * Find new files for the batch of `currentTime`. This is done by first calculating the
+   * ignore threshold for file mod times, and then getting a list of files filtered based on
+   * the current batch time and the ignore threshold. The ignore threshold is the max of
+   * initial ignore threshold and the trailing end of the remember window (that is, which ever
+   * is later in time).
    */
-  private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
-    logDebug("Trying to get new files for time " + currentTime)
-    lastNewFileFindingTime = System.currentTimeMillis
-    val filter = new CustomPathFilter(currentTime)
-    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
-    val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
-    logInfo("Finding new files took " + timeTaken + " ms")
-    logDebug("# cached file times = " + fileModTimes.size)
-    if (timeTaken > slideDuration.milliseconds) {
-      logWarning(
-        "Time taken to find new files exceeds the batch size. " +
-          "Consider increasing the batch size or reduceing the number of " +
-          "files in the monitored directory."
+  private def findNewFiles(currentTime: Long): Array[String] = {
+    try {
+      lastNewFileFindingTime = System.currentTimeMillis
+
+      // Calculate ignore threshold
+      val modTimeIgnoreThreshold = math.max(
+        initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
+        currentTime - durationToRemember.milliseconds  // trailing end of the remember window
       )
+      logDebug(s"Getting new files for time $currentTime, " +
+        s"ignoring files older than $modTimeIgnoreThreshold")
+      val filter = new PathFilter {
+        def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
+      }
+      val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
+      val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+      logInfo("Finding new files took " + timeTaken + " ms")
+      logDebug("# cached file times = " + fileToModTime.size)
+      if (timeTaken > slideDuration.milliseconds) {
+        logWarning(
+          "Time taken to find new files exceeds the batch size. " +
+            "Consider increasing the batch size or reducing the number of " +
+            "files in the monitored directory."
+        )
+      }
+      newFiles
+    } catch {
+      case e: Exception =>
+        logWarning("Error finding new files", e)
+        reset()
+        Array.empty
+    }
+  }
+
+  /**
+   * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be
+   * accepted, it has to pass the following criteria.
+   * - It must pass the user-provided file filter.
+   * - It must be newer than the ignore threshold. It is assumed that files older than the ignore
+   *   threshold have already been considered or are existing files before start
+   *   (when newFileOnly = true).
+   * - It must not be present in the recently selected files that this class remembers.
+   * - It must not be newer than the time of the batch (i.e. `currentTime` for which this
+   *   file is being tested. This can occur if the driver was recovered, and the missing batches
+   *   (during downtime) are being generated. In that case, a batch of time T may be generated
+   *   at time T+x. Say x = 5. If that batch T contains file of mod time T+5, then bad things can
+   *   happen. Let's say the selected files are remembered for 60 seconds.  At time t+61,
+   *   the batch of time t is forgotten, and the ignore threshold is still T+1.
+   *   The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
+   *   Hence they can get selected as new files again. To prevent this, files whose mod time is more
+   *   than current batch time are not considered.
+   */
+  private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
+    val pathStr = path.toString
+    // Reject file if it does not satisfy filter
+    if (!filter(path)) {
+      logDebug(s"$pathStr rejected by filter")
+      return false
+    }
+    // Reject file if it was created before the ignore time
+    val modTime = getFileModTime(path)
+    if (modTime <= modTimeIgnoreThreshold) {
+      // Use <= instead of < to avoid SPARK-4518
+      logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
+      return false
     }
-    (newFiles, filter.minNewFileModTime)
+    // Reject file if mod time > current batch time
+    if (modTime > currentTime) {
+      logDebug(s"$pathStr not selected as mod time $modTime > current time $currentTime")
+      return false
+    }
+    // Reject file if it was considered earlier
+    if (recentlySelectedFiles.contains(pathStr)) {
+      logDebug(s"$pathStr already considered")
+      return false
+    }
+    logDebug(s"$pathStr accepted with mod time $modTime")
+    return true
   }
 
   /** Generate one RDD from an array of files */
@@ -132,21 +243,21 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     new UnionRDD(context.sparkContext, fileRDDs)
   }
 
+  /** Get file mod time from cache or fetch it from the file system */
+  private def getFileModTime(path: Path) = {
+    fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
+  }
+
   private def directoryPath: Path = {
     if (path_ == null) path_ = new Path(directory)
     path_
   }
 
   private def fs: FileSystem = {
-    if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
+    if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
     fs_
   }
 
-  private def getFileModTime(path: Path) = {
-    // Get file mod time from cache or fetch it from the file system
-    fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
-  }
-
   private def reset()  {
     fs_ = null
   }
@@ -155,9 +266,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
-    generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
-    files = new HashMap[Time, Array[String]]
-    fileModTimes = new TimeStampedHashMap[String, Long](true)
+    generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
+    batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+    recentlySelectedFiles = new mutable.HashSet[String]()
+    fileToModTime = new TimeStampedHashMap[String, Long](true)
   }
 
   /**
@@ -167,11 +279,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   private[streaming]
   class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
 
-    def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+    def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
 
     override def update(time: Time) {
       hadoopFiles.clear()
-      hadoopFiles ++= files
+      hadoopFiles ++= batchTimeToSelectedFiles
     }
 
     override def cleanup(time: Time) { }
@@ -182,7 +294,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           // Restore the metadata in both files and generatedRDDs
           logInfo("Restoring files for time " + t + " - " +
             f.mkString("[", ", ", "]") )
-          files += ((t, f))
+          batchTimeToSelectedFiles += ((t, f))
+          recentlySelectedFiles ++= f
           generatedRDDs += ((t, filesToRDD(f)))
         }
       }
@@ -193,57 +306,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
     }
   }
+}
+
+private[streaming]
+object FileInputDStream {
 
   /**
-   * Custom PathFilter class to find new files that
-   * ... have modification time more than ignore time
-   * ... have not been seen in the last interval
-   * ... have modification time less than maxModTime
+   * Minimum duration of remembering the information of selected files. Files with mod times
+   * older than this "window" of remembering will be ignored. So if new files are visible
+   * within this window, then the file will get selected in the next batch.
    */
-  private[streaming]
-  class CustomPathFilter(maxModTime: Long) extends PathFilter {
+  private val MIN_REMEMBER_DURATION = Minutes(1)
 
-    // Minimum of the mod times of new files found in the current interval
-    var minNewFileModTime = -1L
+  def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
 
-    def accept(path: Path): Boolean = {
-      try {
-        if (!filter(path)) {  // Reject file if it does not satisfy filter
-          logDebug("Rejected by filter " + path)
-          return false
-        }
-        // Reject file if it was found in the last interval
-        if (lastFoundFiles.contains(path.toString)) {
-          logDebug("Mod time equal to last mod time, but file considered already")
-          return false
-        }
-        val modTime = getFileModTime(path)
-        logDebug("Mod time for " + path + " is " + modTime)
-        if (modTime < ignoreTime) {
-          // Reject file if it was created before the ignore time (or, before last interval)
-          logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
-          return false
-        } else if (modTime > maxModTime) {
-          // Reject file if it is too new that considering it may give errors
-          logDebug("Mod time more than ")
-          return false
-        }
-        if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
-          minNewFileModTime = modTime
-        }
-        logDebug("Accepted " + path)
-      } catch {
-        case fnfe: java.io.FileNotFoundException =>
-          logWarning("Error finding new files", fnfe)
-          reset()
-          return false
-      }
-      true
-    }
+  /**
+   * Calculate the number of last batches to remember, such that all the files selected in
+   * at least last MIN_REMEMBER_DURATION duration can be remembered.
+   */
+  def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
+    math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
   }
 }
-
-private[streaming]
-object FileInputDStream {
-  def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb0e9b09/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e5592e5..77ff1ca 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -265,7 +265,7 @@ class CheckpointSuite extends TestSuiteBase {
 
     // Verify whether files created have been recorded correctly or not
     var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
     assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
     assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
     assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb0e9b09/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index fa04fa3..307052a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,9 +28,12 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
+import scala.concurrent.duration._
+import scala.language.postfixOps
 
 import com.google.common.io.Files
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
@@ -38,6 +41,9 @@ import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.util.Utils
 import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
 import org.apache.spark.rdd.RDD
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.fs.Path
 
 class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
@@ -91,54 +97,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
 
-  test("file input stream") {
-    // Disable manual clock as FileInputDStream does not work with manual clock
-    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-
-    // Set up the streaming context and input streams
-    val testDir = Utils.createTempDir()
-    val ssc = new StreamingContext(conf, batchDuration)
-    val fileStream = ssc.textFileStream(testDir.toString)
-    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    def output = outputBuffer.flatMap(x => x)
-    val outputStream = new TestOutputStream(fileStream, outputBuffer)
-    outputStream.register()
-    ssc.start()
-
-    // Create files in the temporary directory so that Spark Streaming can read data from it
-    val input = Seq(1, 2, 3, 4, 5)
-    val expectedOutput = input.map(_.toString)
-    Thread.sleep(1000)
-    for (i <- 0 until input.size) {
-      val file = new File(testDir, i.toString)
-      Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
-      logInfo("Created file " + file)
-      Thread.sleep(batchDuration.milliseconds)
-      Thread.sleep(1000)
-    }
-    val startTime = System.currentTimeMillis()
-    Thread.sleep(1000)
-    val timeTaken = System.currentTimeMillis() - startTime
-    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
-    logInfo("Stopping context")
-    ssc.stop()
-
-    // Verify whether data received by Spark Streaming was as expected
-    logInfo("--------------------------------")
-    logInfo("output, size = " + outputBuffer.size)
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("expected output, size = " + expectedOutput.size)
-    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("--------------------------------")
-
-    // Verify whether all the elements received are as expected
-    // (whether the elements were received one in each interval is not verified)
-    assert(output.toList === expectedOutput.toList)
-
-    Utils.deleteRecursively(testDir)
+  test("file input stream - newFilesOnly = true") {
+    testFileStream(newFilesOnly = true)
+  }
 
-    // Enable manual clock back again for other tests
-    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  test("file input stream - newFilesOnly = false") {
+    testFileStream(newFilesOnly = false)
   }
 
   test("multi-thread receiver") {
@@ -180,7 +144,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     assert(output.sum === numTotalRecords)
   }
 
-  test("queue input stream - oneAtATime=true") {
+  test("queue input stream - oneAtATime = true") {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val queue = new SynchronizedQueue[RDD[String]]()
@@ -223,7 +187,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     }
   }
 
-  test("queue input stream - oneAtATime=false") {
+  test("queue input stream - oneAtATime = false") {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val queue = new SynchronizedQueue[RDD[String]]()
@@ -268,6 +232,50 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
       assert(output(i) === expectedOutput(i))
     }
   }
+
+  def testFileStream(newFilesOnly: Boolean) {
+    var ssc: StreamingContext = null
+    val testDir: File = null
+    try {
+      val testDir = Utils.createTempDir()
+      val existingFile = new File(testDir, "0")
+      Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+
+      Thread.sleep(1000)
+      // Set up the streaming context and input streams
+      val newConf = conf.clone.set(
+        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+      ssc = new StreamingContext(newConf, batchDuration)
+      val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+        testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
+      val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+      val outputStream = new TestOutputStream(fileStream, outputBuffer)
+      outputStream.register()
+      ssc.start()
+
+      // Create files in the directory
+      val input = Seq(1, 2, 3, 4, 5)
+      input.foreach { i =>
+        Thread.sleep(batchDuration.milliseconds)
+        val file = new File(testDir, i.toString)
+        Files.write(i + "\n", file, Charset.forName("UTF-8"))
+        logInfo("Created file " + file)
+      }
+
+      // Verify that all the files have been read
+      val expectedOutput = if (newFilesOnly) {
+        input.map(_.toString).toSet
+      } else {
+        (Seq(0) ++ input).map(_.toString).toSet
+      }
+      eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
+        assert(outputBuffer.flatten.toSet === expectedOutput)
+      }
+    } finally {
+      if (ssc != null) ssc.stop()
+      if (testDir != null) Utils.deleteRecursively(testDir)
+    }
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org