You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Kannan Rajah <kr...@maprtech.com> on 2015/04/14 07:01:24 UTC

Using memory mapped file for shuffle

DiskStore.getBytes uses memory mapped files if the length is more than a
configured limit. This code path is used during map side shuffle in
ExternalSorter. I want to know if its possible for the length to exceed the
limit in the case of shuffle. The reason I ask is in the case of Hadoop,
each map task is supposed to produce only data that can fit within the
task's configured max memory. Otherwise it will result in OOM. Is the
behavior same in Spark or the size of data generated by a map task can
exceed what can be fitted in memory.

  if (length < minMemoryMapBytes) {
    val buf = ByteBuffer.allocate(length.toInt)
    ....
  } else {
    Some(channel.map(MapMode.READ_ONLY, offset, length))
  }

--
Kannan

Re: Using memory mapped file for shuffle

Posted by Sandy Ryza <sa...@cloudera.com>.
Spark currently doesn't allocate any memory off of the heap for shuffle
objects.  When the in-memory data gets too large, it will write it out to a
file, and then merge spilled filed later.

What exactly do you mean by store shuffle data in HDFS?

-Sandy

On Tue, Apr 14, 2015 at 10:15 AM, Kannan Rajah <kr...@maprtech.com> wrote:

> Sandy,
> Can you clarify how it won't cause OOM? Is it anyway to related to memory
> being allocated outside the heap - native space? The reason I ask is that I
> have a use case to store shuffle data in HDFS. Since there is no notion of
> memory mapped files, I need to store it as a byte buffer. I want to make
> sure this will not cause OOM when the file size is large.
>
>
> --
> Kannan
>
> On Tue, Apr 14, 2015 at 9:07 AM, Sandy Ryza <sa...@cloudera.com>
> wrote:
>
>> Hi Kannan,
>>
>> Both in MapReduce and Spark, the amount of shuffle data a task produces
>> can exceed the tasks memory without risk of OOM.
>>
>> -Sandy
>>
>> On Tue, Apr 14, 2015 at 6:47 AM, Imran Rashid <ir...@cloudera.com>
>> wrote:
>>
>>> That limit doesn't have anything to do with the amount of available
>>> memory.  Its just a tuning parameter, as one version is more efficient
>>> for
>>> smaller files, the other is better for bigger files.  I suppose the
>>> comment
>>> is a little better in FileSegmentManagedBuffer:
>>>
>>>
>>> https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62
>>>
>>> On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah <kr...@maprtech.com>
>>> wrote:
>>>
>>> > DiskStore.getBytes uses memory mapped files if the length is more than
>>> a
>>> > configured limit. This code path is used during map side shuffle in
>>> > ExternalSorter. I want to know if its possible for the length to
>>> exceed the
>>> > limit in the case of shuffle. The reason I ask is in the case of
>>> Hadoop,
>>> > each map task is supposed to produce only data that can fit within the
>>> > task's configured max memory. Otherwise it will result in OOM. Is the
>>> > behavior same in Spark or the size of data generated by a map task can
>>> > exceed what can be fitted in memory.
>>> >
>>> >   if (length < minMemoryMapBytes) {
>>> >     val buf = ByteBuffer.allocate(length.toInt)
>>> >     ....
>>> >   } else {
>>> >     Some(channel.map(MapMode.READ_ONLY, offset, length))
>>> >   }
>>> >
>>> > --
>>> > Kannan
>>> >
>>>
>>
>>
>

Re: Using memory mapped file for shuffle

Posted by Kannan Rajah <kr...@maprtech.com>.
Sandy,
Can you clarify how it won't cause OOM? Is it anyway to related to memory
being allocated outside the heap - native space? The reason I ask is that I
have a use case to store shuffle data in HDFS. Since there is no notion of
memory mapped files, I need to store it as a byte buffer. I want to make
sure this will not cause OOM when the file size is large.


--
Kannan

On Tue, Apr 14, 2015 at 9:07 AM, Sandy Ryza <sa...@cloudera.com> wrote:

> Hi Kannan,
>
> Both in MapReduce and Spark, the amount of shuffle data a task produces
> can exceed the tasks memory without risk of OOM.
>
> -Sandy
>
> On Tue, Apr 14, 2015 at 6:47 AM, Imran Rashid <ir...@cloudera.com>
> wrote:
>
>> That limit doesn't have anything to do with the amount of available
>> memory.  Its just a tuning parameter, as one version is more efficient for
>> smaller files, the other is better for bigger files.  I suppose the
>> comment
>> is a little better in FileSegmentManagedBuffer:
>>
>>
>> https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62
>>
>> On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah <kr...@maprtech.com>
>> wrote:
>>
>> > DiskStore.getBytes uses memory mapped files if the length is more than a
>> > configured limit. This code path is used during map side shuffle in
>> > ExternalSorter. I want to know if its possible for the length to exceed
>> the
>> > limit in the case of shuffle. The reason I ask is in the case of Hadoop,
>> > each map task is supposed to produce only data that can fit within the
>> > task's configured max memory. Otherwise it will result in OOM. Is the
>> > behavior same in Spark or the size of data generated by a map task can
>> > exceed what can be fitted in memory.
>> >
>> >   if (length < minMemoryMapBytes) {
>> >     val buf = ByteBuffer.allocate(length.toInt)
>> >     ....
>> >   } else {
>> >     Some(channel.map(MapMode.READ_ONLY, offset, length))
>> >   }
>> >
>> > --
>> > Kannan
>> >
>>
>
>

Re: Using memory mapped file for shuffle

Posted by Sandy Ryza <sa...@cloudera.com>.
Hi Kannan,

Both in MapReduce and Spark, the amount of shuffle data a task produces can
exceed the tasks memory without risk of OOM.

-Sandy

On Tue, Apr 14, 2015 at 6:47 AM, Imran Rashid <ir...@cloudera.com> wrote:

> That limit doesn't have anything to do with the amount of available
> memory.  Its just a tuning parameter, as one version is more efficient for
> smaller files, the other is better for bigger files.  I suppose the comment
> is a little better in FileSegmentManagedBuffer:
>
>
> https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62
>
> On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah <kr...@maprtech.com>
> wrote:
>
> > DiskStore.getBytes uses memory mapped files if the length is more than a
> > configured limit. This code path is used during map side shuffle in
> > ExternalSorter. I want to know if its possible for the length to exceed
> the
> > limit in the case of shuffle. The reason I ask is in the case of Hadoop,
> > each map task is supposed to produce only data that can fit within the
> > task's configured max memory. Otherwise it will result in OOM. Is the
> > behavior same in Spark or the size of data generated by a map task can
> > exceed what can be fitted in memory.
> >
> >   if (length < minMemoryMapBytes) {
> >     val buf = ByteBuffer.allocate(length.toInt)
> >     ....
> >   } else {
> >     Some(channel.map(MapMode.READ_ONLY, offset, length))
> >   }
> >
> > --
> > Kannan
> >
>

Re: Using memory mapped file for shuffle

Posted by Imran Rashid <ir...@cloudera.com>.
That limit doesn't have anything to do with the amount of available
memory.  Its just a tuning parameter, as one version is more efficient for
smaller files, the other is better for bigger files.  I suppose the comment
is a little better in FileSegmentManagedBuffer:

https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62

On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah <kr...@maprtech.com> wrote:

> DiskStore.getBytes uses memory mapped files if the length is more than a
> configured limit. This code path is used during map side shuffle in
> ExternalSorter. I want to know if its possible for the length to exceed the
> limit in the case of shuffle. The reason I ask is in the case of Hadoop,
> each map task is supposed to produce only data that can fit within the
> task's configured max memory. Otherwise it will result in OOM. Is the
> behavior same in Spark or the size of data generated by a map task can
> exceed what can be fitted in memory.
>
>   if (length < minMemoryMapBytes) {
>     val buf = ByteBuffer.allocate(length.toInt)
>     ....
>   } else {
>     Some(channel.map(MapMode.READ_ONLY, offset, length))
>   }
>
> --
> Kannan
>