You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/31 19:13:14 UTC

[11/16] git commit: Removed slack time in file stream and added better handling of exceptions due to failures due FileNotFound exceptions.

Removed slack time in file stream and added better handling of exceptions due to failures due FileNotFound exceptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bacc65cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bacc65cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bacc65cf

Branch: refs/heads/master
Commit: bacc65cf28b9f95b129e9adede43f684f2c5ced3
Parents: d4dfab5
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 10:18:46 2013 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 10:18:46 2013 +0000

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 59 +++++++-------------
 .../spark/streaming/CheckpointSuite.scala       |  6 --
 .../spark/streaming/InputStreamsSuite.scala     |  6 --
 3 files changed, 21 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index d6514a1..b163b13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -40,9 +40,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
 
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
-  // Max attempts to try if listing files fail
-  val MAX_ATTEMPTS = 10
-
   // Latest file mod time seen till any point of time
   private val prevModTimeFiles = new HashSet[String]()
   private var prevModTime = 0L
@@ -109,19 +106,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * (new files found, latest modification time among them, files with latest modification time)
    */
   private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
-    logDebug("Trying to get new files for time " + currentTime)
-    var attempts = 0
-    while (attempts < MAX_ATTEMPTS) {
-      attempts += 1
-      try {
-        val filter = new CustomPathFilter(currentTime)
-        val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
-        return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
-      } catch {
-        case ioe: IOException =>
-          logWarning("Attempt " + attempts + " to get new files failed", ioe)
-          reset()
-      }
+    try {
+      logDebug("Trying to get new files for time " + currentTime)
+      val filter = new CustomPathFilter(currentTime)
+      val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+      return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+    } catch {
+      case e: Exception =>
+        logError("Attempt to get new files failed", e)
+        reset()
     }
     (Seq.empty, -1, Seq.empty)
   }
@@ -193,22 +186,17 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * been seen before (i.e. the file should not be in lastModTimeFiles)
    */
   private[streaming]
-  class CustomPathFilter(currentTime: Long) extends PathFilter {
+  class CustomPathFilter(maxModTime: Long) extends PathFilter {
     // Latest file mod time seen in this round of fetching files and its corresponding files
     var latestModTime = 0L
     val latestModTimeFiles = new HashSet[String]()
 
-    // Creating an RDD from a HDFS file immediately after the file is created sometime returns
-    // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older
-    // than slack time from current time is considered for processing.
-    val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong
-    val maxModTime = currentTime - slackTime
-
     def accept(path: Path): Boolean = {
-      if (!filter(path)) {  // Reject file if it does not satisfy filter
-        logDebug("Rejected by filter " + path)
-        return false
-      } else {              // Accept file only if
+      try {
+        if (!filter(path)) {  // Reject file if it does not satisfy filter
+          logDebug("Rejected by filter " + path)
+          return false
+        }
         val modTime = fs.getFileStatus(path).getModificationTime()
         logDebug("Mod time for " + path + " is " + modTime)
         if (modTime < prevModTime) {
@@ -228,8 +216,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         }
         latestModTimeFiles += path.toString
         logDebug("Accepted " + path)
-        return true
+      } catch {
+        case fnfe: java.io.FileNotFoundException => 
+          logWarning("Error finding new files", fnfe)
+          reset()
+          return false
       }
+      return true
     }
   }
 }
@@ -237,14 +230,4 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
 private[streaming]
 object FileInputDStream {
   def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
-
-  // Disable slack time (i.e. set it to zero)
-  private[streaming] def disableSlackTime() {
-    System.setProperty("spark.streaming.fileStream.slackTime", "0")
-  }
-
-  // Restore default value of slack time
-  private[streaming] def restoreSlackTime() {
-    System.clearProperty("spark.streaming.fileStream.slackTime")
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0347cc1..4e25c95 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -200,9 +200,6 @@ class CheckpointSuite extends TestSuiteBase {
     val clockProperty = System.getProperty("spark.streaming.clock")
     System.clearProperty("spark.streaming.clock")
 
-    // Disable slack time of file stream when testing with local file system
-    FileInputDStream.disableSlackTime()
-
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -303,9 +300,6 @@ class CheckpointSuite extends TestSuiteBase {
     // Enable manual clock back again for other tests
     if (clockProperty != null)
       System.setProperty("spark.streaming.clock", clockProperty)
-
-    // Restore the default slack time
-    FileInputDStream.restoreSlackTime()
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index e506c95..5fa14ad 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -152,9 +152,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     // Disable manual clock as FileInputDStream does not work with manual clock
     System.clearProperty("spark.streaming.clock")
 
-    // Disable slack time of file stream when testing with local file system
-    FileInputDStream.disableSlackTime()
-
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     val ssc = new StreamingContext(master, framework, batchDuration)
@@ -199,9 +196,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Enable manual clock back again for other tests
     System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-
-    // Restore the default slack time
-    FileInputDStream.restoreSlackTime()
   }