You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Prashant Sharma (JIRA)" <ji...@apache.org> on 2019/05/09 11:14:00 UTC

[jira] [Updated] (SPARK-27664) Performance issue with FileStatusCache, while reading from object stores.

     [ https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashant Sharma updated SPARK-27664:
------------------------------------
    Description: 
In short,

This issue(i.e. degraded performance ) surfaces when the number of files are large > 100K, and is stored on an object store, or any remote storage. The actual issue is due to,

Everything is inserted as a single entry in the FileStatusCache i.e. guava cache, which does not fit unless the cache is configured to be very very large or 4X. Reason: [https://github.com/google/guava/issues/3462].

 

Full story, with possible solutions,

When we read a directory in spark by,
{code:java}
spark.read.parquet("/dir/data/test").limit(1).show()
{code}
behind the scenes, it fetches the FileStatus objects and caches them, inside a FileStatusCache, so that it does not need to refetch these objects. Internally, it scans using listLeafFiles function at driver. 
 Inside the cache, the entire content of the listing as array of FileStatus objects is inserted as a single entry, with key as "/dir/data/test", in the FileStatusCache. The default size of this cache is 250MB and it is configurable. This underlying cache uses guava cache.

The guava cache has one interesting property, i.e. a single entry can only be as large as
{code:java}
maximumSize/concurrencyLevel{code}
see [https://github.com/google/guava/issues/3462], for details. So for a cache size of 250MB, a single entry can be as large as only 250MB/4, since the default concurrency level is 4 in guava. This size is around 62MB, which is good enough for most datasets, but for directories with larger listing, it does not work that well. And the effect of this is especially evident when such listings are for object stores like Amazon s3 or IBM Cloud object store etc..

So, currently one can work around this problem by setting the value of size of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very high, as it needs to be much more than 4x of what is required.

In order to fix this issue, we can take 3 different approaches,

1) one stop gap fix can be, reduce the concurrency level of the guava cache to be just 1, because if everything has to be just one single entry per job, then concurrency is not helpful anyway.

2) The ideal fix would be, to divide the input array into multiple entries in the cache, instead of inserting everything against a single key. This can be done using directories as keys, if there are multiple nested directories under a directory, but if a user has everything listed under a single dir, then this solution does not help either. 

3) Even more ideal fix would be, to make concurrency level configurable, for those who want to change it. And while inserting the entry in the cache divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, before inserting. This way cache will perform optimally, and even if there is an eviction, it will evict only a part of the entries, as against all the entries in the current implementation. How many entries are evicted due to size, depends on concurrencyLevel configured. This approach can be taken, even without making `concurrencyLevel` configurable.

  was:
In short,

This issue(i.e. degraded performance ) surfaces when the number of files are large > 200K, and is stored on an object store, or any remote storage. The actual issue is due to,

Everything is inserted as a single entry in the FileStatusCache i.e. guava cache, which does not fit unless the cache is configured to be very very large or 4X. Reason: [https://github.com/google/guava/issues/3462].

 

Full story, with possible solutions,

When we read a directory in spark by,
{code:java}
spark.read.parquet("/dir/data/test").limit(1).show()
{code}
behind the scenes, it fetches the FileStatus objects and caches them, inside a FileStatusCache, so that it does not need to refetch these objects. Internally, it scans using listLeafFiles function at driver. 
 Inside the cache, the entire content of the listing as array of FileStatus objects is inserted as a single entry, with key as "/dir/data/test", in the FileStatusCache. The default size of this cache is 250MB and it is configurable. This underlying cache uses guava cache.

The guava cache has one interesting property, i.e. a single entry can only be as large as
{code:java}
maximumSize/concurrencyLevel{code}
see [https://github.com/google/guava/issues/3462], for details. So for a cache size of 250MB, a single entry can be as large as only 250MB/4, since the default concurrency level is 4 in guava. This size is around 62MB, which is good enough for most datasets, but for directories with larger listing, it does not work that well. And the effect of this is especially evident when such listings are for object stores like Amazon s3 or IBM Cloud object store etc..

So, currently one can work around this problem by setting the value of size of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very high, as it needs to be much more than 4x of what is required.

In order to fix this issue, we can take 3 different approaches,

1) one stop gap fix can be, reduce the concurrency level of the guava cache to be just 1, because if everything has to be just one single entry per job, then concurrency is not helpful anyway.

2) The ideal fix would be, to divide the input array into multiple entries in the cache, instead of inserting everything against a single key. This can be done using directories as keys, if there are multiple nested directories under a directory, but if a user has everything listed under a single dir, then this solution does not help either. 

3) Even more ideal fix would be, to make concurrency level configurable, for those who want to change it. And while inserting the entry in the cache divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, before inserting. This way cache will perform optimally, and even if there is an eviction, it will evict only a part of the entries, as against all the entries in the current implementation. How many entries are evicted due to size, depends on concurrencyLevel configured. This approach can be taken, even without making `concurrencyLevel` configurable.


> Performance issue with FileStatusCache, while reading from object stores.
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27664
>                 URL: https://issues.apache.org/jira/browse/SPARK-27664
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0, 2.4.3
>            Reporter: Prashant Sharma
>            Priority: Major
>
> In short,
> This issue(i.e. degraded performance ) surfaces when the number of files are large > 100K, and is stored on an object store, or any remote storage. The actual issue is due to,
> Everything is inserted as a single entry in the FileStatusCache i.e. guava cache, which does not fit unless the cache is configured to be very very large or 4X. Reason: [https://github.com/google/guava/issues/3462].
>  
> Full story, with possible solutions,
> When we read a directory in spark by,
> {code:java}
> spark.read.parquet("/dir/data/test").limit(1).show()
> {code}
> behind the scenes, it fetches the FileStatus objects and caches them, inside a FileStatusCache, so that it does not need to refetch these objects. Internally, it scans using listLeafFiles function at driver. 
>  Inside the cache, the entire content of the listing as array of FileStatus objects is inserted as a single entry, with key as "/dir/data/test", in the FileStatusCache. The default size of this cache is 250MB and it is configurable. This underlying cache uses guava cache.
> The guava cache has one interesting property, i.e. a single entry can only be as large as
> {code:java}
> maximumSize/concurrencyLevel{code}
> see [https://github.com/google/guava/issues/3462], for details. So for a cache size of 250MB, a single entry can be as large as only 250MB/4, since the default concurrency level is 4 in guava. This size is around 62MB, which is good enough for most datasets, but for directories with larger listing, it does not work that well. And the effect of this is especially evident when such listings are for object stores like Amazon s3 or IBM Cloud object store etc..
> So, currently one can work around this problem by setting the value of size of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very high, as it needs to be much more than 4x of what is required.
> In order to fix this issue, we can take 3 different approaches,
> 1) one stop gap fix can be, reduce the concurrency level of the guava cache to be just 1, because if everything has to be just one single entry per job, then concurrency is not helpful anyway.
> 2) The ideal fix would be, to divide the input array into multiple entries in the cache, instead of inserting everything against a single key. This can be done using directories as keys, if there are multiple nested directories under a directory, but if a user has everything listed under a single dir, then this solution does not help either. 
> 3) Even more ideal fix would be, to make concurrency level configurable, for those who want to change it. And while inserting the entry in the cache divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, before inserting. This way cache will perform optimally, and even if there is an eviction, it will evict only a part of the entries, as against all the entries in the current implementation. How many entries are evicted due to size, depends on concurrencyLevel configured. This approach can be taken, even without making `concurrencyLevel` configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org