You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sigurd Knippenberg <si...@knippenberg.com> on 2015/08/31 20:49:59 UTC

Too many open files issue

I am running in a 'too many open files' issue and before I posted this I
have searched on the web to see if anyone had a solution already to my
particular problem but I did not see anything that helped.

I know I can adjust the max open files allowed by the OS but I'd rather fix
the underlaying issue.

In HDFS I have a directory that contains hourly files in a directory
structure like this: directoryname/hourly/2015/06/02/13 (where the numbers
at the end are the date in YYYY/MM/DD/HH format).

I have a Spark job that roughly does the following:

val directories = findModifiedFiles()
directories.foreach(directory => {
    sparkContext.newAPIHadoopFile(directory)
        .filter(...)
        .map(...)
        .reduceByKey(...)
        .foreachPartition(iterator => {
            iterator.foreach(tuple => {
                // send data to kafka
            }
        }
}

If there are only a few directories that have been modified then this works
pretty well. But when I have the job reprocess all the data (I have 350M of
test data that pretty much has data for each hour of each day for a full
year) I run out of file handles.

I executed the test on a test cluster of 2 hadoop slave nodes that each
have the HDFS data node and yarn node manager running.

When I run "lsof -p" on the Spark processes, I see a lot of the following
types of open files:

java    21196 yarn 3268r   REG               8,16       139  533320
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746324_5500.meta

java    21196 yarn 3269r   REG               8,16     15004  533515
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422

java    21196 yarn 3270r   REG               8,16       127  533516
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422_5598.meta

java    21196 yarn 3271r   REG               8,16     15583  534081
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704

java    21196 yarn 3272r   REG               8,16       131  534082
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704_5880.meta

When I watch the process run, I can see that the number of those open files
is ever increasing while it is processing directories until it runs out of
file handles (it then drops to zero and starts up again until it runs out
again but that is due to the fact that Yarn retries running the job). It
basically ends up opening about 4500 file handles to those files per node.

As I said, I know that I can increase the number of open file handles, and
I will do that, but in my opinion it should not be ever increasing. I would
have thought that when I was done with an RDD that Spark would close all
the resources that it opened for them (so that it would close the file
handles after each execution of the directories.foreach loop). I looked if
there was a close() method or something like that for the RDD but couldn't
find that.

Am I doing something that is causing Spark not to close the file handles?
Should I write this job differently?
Thanks,

Sigurd

Re: Too many open files issue

Posted by Sigurd Knippenberg <si...@knippenberg.com>.
I don't think that is the issue. I have it setup to run in a thread pool
but I have set the pool size to 1 for this test until I get this resolved.
I am having some problems with using the Spark web portal since it is
picking a random port and with the way my environment is setup, by time I
have figured out which port it is using the job has finished. But what I
did do is add some logging and I added collecting the RDD record count to
make sure the last logging statements were in fact executed after the RDD
process ran. I added the logging statements in the job flow:

val directories = findModifiedFiles()
directories.foreach(directory => {
    log 'Starting directory processor for $directory'
    rdd = sparkContext.newAPIHadoopFile(directory)
        .filter(...)
        .map(...)
        .reduceByKey(...)

    rdd.foreachPartition(iterator => {
            iterator.foreach(tuple => {
                // send data to kafka
            }
        }

    val count = rdd.count
    log 'Processed $count records for $directory'
    log 'Finished directory processor for $directory'
}

This results in these log lines until the "Too many open files in system"
errors started happening after which it only printed the first log line for
each iteration (as expected since it's throwing an exception).

Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/20

Just to see what would happen if I made the job run slower and had GC run
more frequently (in case it had something to do with that), I added the
following to each loop iteration:
      System.gc()
      Thread.sleep(5000)

But besides making the job run a lot longer it did not change anything.

Sigurd


On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao <sa...@gmail.com> wrote:

> Here is the code in which NewHadoopRDD register close handler and be
> called when the task is completed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
> ).
>
> From my understanding, possibly the reason is that this `foreach` code in
> your implementation is not executed Spark job one by one in loop as
> expected, on the contrary all the jobs are submitted to the DAGScheduler
> simultaneously, since each job has no dependency to others, Spark's
> scheduler will unwrap the loop and submit jobs in parallelism, so maybe
> several map stages are running and pending, this makes your node out of
> file handler.
>
> You could check Spark web portal to see if there's several map stages
> running simultaneously, or some of them are running while others are
> pending.
>
> Thanks
> Jerry
>
>
> On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg <sigurd@knippenberg.com
> > wrote:
>
>> Yep. I know. It's was set to 32K when I ran this test. If I bump it to
>> 64K the issue goes away. It still doesn't make sense to me that the Spark
>> job doesn't release its file handles until the end of the job instead of
>> doing that while my loop iterates.
>>
>> Sigurd
>>
>> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran <st...@hortonworks.com>
>> wrote:
>>
>>>
>>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg <si...@knippenberg.com>
>>> wrote:
>>>
>>> I know I can adjust the max open files allowed by the OS but I'd rather
>>> fix the underlaying issue.
>>>
>>>
>>>
>>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>>
>>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>>
>>
>>
>

Re: Too many open files issue

Posted by Saisai Shao <sa...@gmail.com>.
Here is the code in which NewHadoopRDD register close handler and be called
when the task is completed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
).

>From my understanding, possibly the reason is that this `foreach` code in
your implementation is not executed Spark job one by one in loop as
expected, on the contrary all the jobs are submitted to the DAGScheduler
simultaneously, since each job has no dependency to others, Spark's
scheduler will unwrap the loop and submit jobs in parallelism, so maybe
several map stages are running and pending, this makes your node out of
file handler.

You could check Spark web portal to see if there's several map stages
running simultaneously, or some of them are running while others are
pending.

Thanks
Jerry


On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg <si...@knippenberg.com>
wrote:

> Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
> the issue goes away. It still doesn't make sense to me that the Spark job
> doesn't release its file handles until the end of the job instead of doing
> that while my loop iterates.
>
> Sigurd
>
> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran <st...@hortonworks.com>
> wrote:
>
>>
>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg <si...@knippenberg.com>
>> wrote:
>>
>> I know I can adjust the max open files allowed by the OS but I'd rather
>> fix the underlaying issue.
>>
>>
>>
>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>
>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>
>
>

Re: Too many open files issue

Posted by Steve Loughran <st...@hortonworks.com>.
ah, now that does sound suspicious...

On 2 Sep 2015, at 14:09, Sigurd Knippenberg <si...@knippenberg.com>> wrote:

Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the issue goes away. It still doesn't make sense to me that the Spark job doesn't release its file handles until the end of the job instead of doing that while my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran <st...@hortonworks.com>> wrote:

On 31 Aug 2015, at 19:49, Sigurd Knippenberg <si...@knippenberg.com>> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles



Re: Too many open files issue

Posted by Sigurd Knippenberg <si...@knippenberg.com>.
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
the issue goes away. It still doesn't make sense to me that the Spark job
doesn't release its file handles until the end of the job instead of doing
that while my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran <st...@hortonworks.com>
wrote:

>
> On 31 Aug 2015, at 19:49, Sigurd Knippenberg <si...@knippenberg.com>
> wrote:
>
> I know I can adjust the max open files allowed by the OS but I'd rather
> fix the underlaying issue.
>
>
>
> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>
> https://wiki.apache.org/hadoop/TooManyOpenFiles
>

Re: Too many open files issue

Posted by Steve Loughran <st...@hortonworks.com>.
On 31 Aug 2015, at 19:49, Sigurd Knippenberg <si...@knippenberg.com>> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles