You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:23:39 UTC

[jira] [Updated] (SPARK-11441) HadoopFsRelation is not scalable in number of files read/written

     [ https://issues.apache.org/jira/browse/SPARK-11441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-11441:
---------------------------------
    Labels: bulk-closed  (was: )

> HadoopFsRelation is not scalable in number of files read/written
> ----------------------------------------------------------------
>
>                 Key: SPARK-11441
>                 URL: https://issues.apache.org/jira/browse/SPARK-11441
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: koert kuipers
>            Priority: Major
>              Labels: bulk-closed
>
> HadoopFsRelation includes a fileStatusCache which holds information on all the datafiles (part files) for the data source in the driver program.
> It is not unusual to be reading from 100k+ or even 1mm part files, in which case filling up this cache will take a very long time (days?) and require a lot of memory. See for example:
> https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201510.mbox/%3CCAG+ckK-FvWK=1B2JqC4S+zaz2zVkQehvoS9Myo0ssMGPC5-3SQ@mail.gmail.com%3E
> This is not the kind of behavior you would expect of a driver program. Also HadoopFsRelation passes this large list of part files into:
> def buildScan(inputFiles: Array[FileStatus]): RDD[Row]
> Almost all implementations of HadoopFsRelation do the following inside buildScan:
> FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
> This means an array of potentially millions of items now gets stored in the JobConf which will be broadcast. I have not found any errors about this on mailing list but i believe this is simply because nobody with a large number of inputFiles has gotten this far.
> Generally when using Hadoop InputFormats there should never be a need to list all the part files driver side. It seems the reason it is done here is to facilitate a process in ParquetRelation driver side that creates a merged data schema. I wonder if its really necessary to look at all the part files for this, or if some assumption can be made that at least all the part files in a directory have the same schema (which would reduce the size of the problem by a factor 100 or so).
> At the very least it seems to be that the caching of files is parquet specific and does not belong in HadoopFsRelation. And buildScan should just use the data paths (so directories if one wants to read all part files in a directory) as it did before SPARK-7673 / PR #6225
> I ran into this issue myself with spark-avro, which also does not handle the input of part files in buildScan well. Spark-avro actually tries to create an RDD (and jobConf broadcast) per part file, which is not scalable even for 1k part files. Note that it is difficult for spark-avro to create an RDD per data directory (as it probably should) since the dataPaths have been lost now that the inputFiles is passed into buildScan instead. This to me again confirms the change in buildScan is troubling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org