You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gil Vernik (JIRA)" <ji...@apache.org> on 2018/08/22 05:27:00 UTC
[jira] [Resolved] (SPARK-25155) Streaming from storage doesn't work
when no directories exists
[ https://issues.apache.org/jira/browse/SPARK-25155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gil Vernik resolved SPARK-25155.
--------------------------------
Resolution: Cannot Reproduce
> Streaming from storage doesn't work when no directories exists
> --------------------------------------------------------------
>
> Key: SPARK-25155
> URL: https://issues.apache.org/jira/browse/SPARK-25155
> Project: Spark
> Issue Type: Bug
> Components: DStreams, Structured Streaming
> Affects Versions: 2.3.1
> Reporter: Gil Vernik
> Priority: Minor
>
> I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` method `findNewFiles`.
> Streaming for the giving path suppose to pickup new files only ( based on the previous run timestamp ). However the code in Spark will first obtain directories, then for each directory will find new files. Here is the relevant code:
> *val* directoryFilter = *new* PathFilter
> { *override def* accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory }
> *val* directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
> *val* newFiles = directories.flatMap(dir =>
> fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
>
> This is not optimized, as it always requires two accesses. In addition this seems to be buggy
> I have an S3 bucket “mydata” with objects “a.csv”, “b.csv”. I noticed that fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 directories and so “a.csv”, “b.csv” were not picked by Spark.
> I tried to make path as “[s3a://mydata/*]” and it didn't worked also.
> I experienced the same problematic behavior with the file system when tried to stream from “/Users/streaming/*”
> I suggest to change the code in Spark so it will perform first list without directoryFilter, which seems not needed at all. The code could be
> *val* directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath)
> The flow would be ( for each entry in directoriesOrfiles )
> * If data object: Spark will apply newFileFilter on the returned objects
> * If directory: then the existing code will perform additional listing at the directory level
> This way it will pick up files from the root of path and the content of directories
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org