You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by srowen <gi...@git.apache.org> on 2018/09/28 14:15:50 UTC
[GitHub] spark pull request #22339: [SPARK-17159][STREAM] Significant speed up for ru...
Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/22339#discussion_r221267480
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
- val newFileFilter = new PathFilter {
- def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
- }
- val directoryFilter = new PathFilter {
- override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
- }
- val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
+ val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
+ .filter(_.isDirectory)
+ .map(_.getPath)
val newFiles = directories.flatMap(dir =>
- fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
+ fs.listStatus(dir)
+ .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
--- End diff --
Nit: I think the indent is too deep here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org