You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/31 19:13:15 UTC

[12/16] git commit: Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored).

Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/be647191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/be647191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/be647191

Branch: refs/heads/master
Commit: be647191386f4c01e7502776fbdc4884b5cdaac2
Parents: bacc65c
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 12:33:12 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 12:33:12 2013 -0800

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 30 +++++++-------------
 1 file changed, 11 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/be647191/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index b163b13..2bb6d91 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-
+import java.io.{ObjectInputStream, IOException}
+import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
 
-import scala.collection.mutable.{HashSet, HashMap}
-import scala.reflect.ClassTag
-
-import java.io.{ObjectInputStream, IOException}
 
 private[streaming]
 class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@@ -106,17 +105,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * (new files found, latest modification time among them, files with latest modification time)
    */
   private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
-    try {
-      logDebug("Trying to get new files for time " + currentTime)
-      val filter = new CustomPathFilter(currentTime)
-      val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
-      return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
-    } catch {
-      case e: Exception =>
-        logError("Attempt to get new files failed", e)
-        reset()
-    }
-    (Seq.empty, -1, Seq.empty)
+    logDebug("Trying to get new files for time " + currentTime)
+    val filter = new CustomPathFilter(currentTime)
+    val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+    (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
   }
 
   /** Generate one RDD from an array of files */