You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/05/10 10:48:00 UTC

[jira] [Commented] (SPARK-27664) Performance issue with FileStatusCache, while reading from object stores.

    [ https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837182#comment-16837182 ] 

Apache Spark commented on SPARK-27664:
--------------------------------------

User 'ScrapCodes' has created a pull request for this issue:
https://github.com/apache/spark/pull/24577

> Performance issue with FileStatusCache, while reading from object stores.
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27664
>                 URL: https://issues.apache.org/jira/browse/SPARK-27664
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0, 2.4.3
>            Reporter: Prashant Sharma
>            Priority: Major
>
> In short,
> This issue(i.e. degraded performance ) surfaces when the number of files are large > 100K, and is stored on an object store, or any remote storage. The actual issue is due to,
> Everything is inserted as a single entry in the FileStatusCache i.e. guava cache, which does not fit unless the cache is configured to be very very large or 4X. Reason: [https://github.com/google/guava/issues/3462].
>  
> Full story, with possible solutions,
> When we read a directory in spark by,
> {code:java}
> spark.read.parquet("/dir/data/test").limit(1).show()
> {code}
> behind the scenes, it fetches the FileStatus objects and caches them, inside a FileStatusCache, so that it does not need to refetch these objects. Internally, it scans using listLeafFiles function at driver. 
>  Inside the cache, the entire content of the listing as array of FileStatus objects is inserted as a single entry, with key as "/dir/data/test", in the FileStatusCache. The default size of this cache is 250MB and it is configurable. This underlying cache uses guava cache.
> The guava cache has one interesting property, i.e. a single entry can only be as large as
> {code:java}
> maximumSize/concurrencyLevel{code}
> see [https://github.com/google/guava/issues/3462], for details. So for a cache size of 250MB, a single entry can be as large as only 250MB/4, since the default concurrency level is 4 in guava. This size is around 62MB, which is good enough for most datasets, but for directories with larger listing, it does not work that well. And the effect of this is especially evident when such listings are for object stores like Amazon s3 or IBM Cloud object store etc..
> So, currently one can work around this problem by setting the value of size of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very high, as it needs to be much more than 4x of what is required. But this has a drawback, that either one has to start the driver with large amount of memory than required or risk an OOM when cache does not evict older entries as the size is configured to be 4x.
> In order to fix this issue, we can take 3 different approaches,
> 1) one stop gap fix can be, reduce the concurrency level of the guava cache to be just 1, for few entries with very large size, we do not lose much by doing this.
> 2) The alternative would be, to divide the input array into multiple entries in the cache, instead of inserting everything against a single key. This can be done using directories as keys, if there are multiple nested directories under a directory, but if a user has everything listed under a single dir, then this solution does not help either and we cannot depend on them creating partitions in their hive/sql table.
> 3) One more alternative fix would be, to make concurrency level configurable, for those who want to change it. And while inserting the entry in the cache divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, before inserting. This way cache will perform optimally, and even if there is an eviction, it will evict only a part of the entries, as against all the entries in the current implementation. How many entries are evicted due to size, depends on concurrencyLevel configured. This approach can be taken, even without making `concurrencyLevel` configurable.
> The problem with this approach is, the partitions in cache are of no use as such, because even if one partition is evicted, then all the partitions of the key should also be evicted, otherwise the results would be wrong. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org