You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Kevin Burton <bu...@gmail.com> on 2011/12/21 00:55:31 UTC

Performance of direct vs indirect shuffling

The current hadoop implementation shuffles directly to disk and then those
disk files are eventually requested by the target nodes which are
responsible for doing the reduce() on the intermediate data.

However, this requires more 2x IO than strictly necessary.

If the data were instead shuffled DIRECTLY to the target host, this IO
overhead would be removed.

I believe that any benefits from writing locally (compressing, combining)
and then doing a transfer can be had by simply allocating a buffer and (say
250-500MB per map task) and then transfering data directly.  I don't think
that the savings will be 100% on par with first writing locally but
remember it's already 2x faster by not having to write to disk... so any
advantages to first shuffling to the local disk would have to be more than
100% faster.

However, writing data to the local disk first could in theory had some
practical advantages under certain loads.  I just don't think they're
practical and that direct shuffling is superior.

Anyone have any thoughts here?

Re: Performance of direct vs indirect shuffling

Posted by Kevin Burton <bu...@gmail.com>.
On Tue, Dec 20, 2011 at 4:53 PM, Todd Lipcon <to...@cloudera.com> wrote:

> The advantages of the "pull" based shuffle is fault tolerance - if you
> shuffle to the reducer and then the reducer dies, you have to rerun
> *all* of the earlier maps in the "push" model.
>

you would have the same situation if you aren't replicating the blocks in
the mapper.

in my situation I'm replicating the shuffle data so it should be a zero sum
game.

The map jobs are just re-run where the last one failed since the shuffle
data has already been written.

(I should note that I'm working on another Map Reduce implementation that
I'm about to OSS)...

There are a LOT of problems in the map reduce space which are themselves
research papers and it would be nice to see more published in this area.


> The advantage of writing to disk is of course that you can have more
> intermediate output than fits in RAM.
>
>
well if you're shuffling across the network and you back up due to network
IO then your map jobs would just run slower.


> In practice, for short jobs, the output might stay entirely in buffer
> cache and never actually hit disk (RHEL by default configures the
> writeback period to 30 seconds when there isn't page cache pressure).
>
>
Or just start to block when memory is exhausted.

Re: Performance of direct vs indirect shuffling

Posted by Todd Lipcon <to...@cloudera.com>.
The advantages of the "pull" based shuffle is fault tolerance - if you
shuffle to the reducer and then the reducer dies, you have to rerun
*all* of the earlier maps in the "push" model.

The advantage of writing to disk is of course that you can have more
intermediate output than fits in RAM.

In practice, for short jobs, the output might stay entirely in buffer
cache and never actually hit disk (RHEL by default configures the
writeback period to 30 seconds when there isn't page cache pressure).

One possible optimization I hope to look into next year is to change
the map output code to push the data to the local TT, which would have
configurable in-memory buffers. Only once those overflow would they
flush to disk. Compared to just using buffer cache, this has the
advantage that it won't _ever_ writeback unless it has to for space
consumption reasons, and is more predictable to manage. My guess is we
could squeeze some performance here but not tons.

-Todd

On Tue, Dec 20, 2011 at 3:55 PM, Kevin Burton <bu...@gmail.com> wrote:
> The current hadoop implementation shuffles directly to disk and then those
> disk files are eventually requested by the target nodes which are
> responsible for doing the reduce() on the intermediate data.
>
> However, this requires more 2x IO than strictly necessary.
>
> If the data were instead shuffled DIRECTLY to the target host, this IO
> overhead would be removed.
>
> I believe that any benefits from writing locally (compressing, combining)
> and then doing a transfer can be had by simply allocating a buffer and (say
> 250-500MB per map task) and then transfering data directly.  I don't think
> that the savings will be 100% on par with first writing locally but remember
> it's already 2x faster by not having to write to disk... so any advantages
> to first shuffling to the local disk would have to be more than 100% faster.
>
> However, writing data to the local disk first could in theory had some
> practical advantages under certain loads.  I just don't think they're
> practical and that direct shuffling is superior.
>
> Anyone have any thoughts here?



-- 
Todd Lipcon
Software Engineer, Cloudera

Re: Performance of direct vs indirect shuffling

Posted by Kevin Burton <bu...@gmail.com>.
Twister, another mapred impl, uses a pub/sub based system like ActiveMQ ...
Peregrine , the mapred impl I've been working on just uses HTTP, Netty, and
async IO to do the same thing.

Note that you mention a BUFFER ... just buffering the IO is not enough.
Most jobs will be larger than memory AND you have to transfer the data ...
so use the buffer to compress and combine the data before it is sent.

On Tue, Dec 20, 2011 at 10:07 PM, Binglin Chang <de...@gmail.com> wrote:

> One possible optimization I hope to look into next year is to change
>> the map output code to push the data to the local TT, which would have
>> configurable in-memory buffers.
>
>
> Have someone ever considered a general data transfer service bundled with
> YARN? So other applications(rather than MR) can also benefits from it.
> The data transfer service looks like real world mail service, it has two
> simple interface: register send receive (stream based)
> Mapper & Recuder: Register(LocalMailService, Address(AppId,
> MapperId/ReducerId)
> Mapper: send(LocalMailService, from=Address(AppId, MapperId),
> to=Address(AppId, ReducerId), data=xxx);
> Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
> to=Address(AppId, ReducerId));
>
> LocalMailService manages a big buffer(configurable), so it can cache map
> outputs or dump to disk if there is no memory;
> LocalMailService can start transfering data from source to dest if both
> addresses is registered;
> If source & dest are in the same machine, there will be no network
> transfer;
> To address can be multiple(broadcast or like mail group), this is useful
> for 1:N data transfers(binary/side data distribution), the service can use
> P2P for this kind of work(much better than -cacheFile)
>
> Just an idea, if anyone interested.
>
>
>
> On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton <bu...@gmail.com>wrote:
>
>>
>> We've discussed 'push' v/s 'pull' shuffle multiple times and each time
>>> turned away due to complexities in MR1. With MRv2 (YARN) this would be much
>>> more doable.
>>>
>>>
>> Ah.... gotcha. This is what I expected as well.  It would be interesting
>> to see a list of changes like this in MR1 vs MR2 to see what could
>> POTENTIALLY happen in the future should everyone get spare time.
>>
>>
>>> So, to really not do any disk i/o during the shuffle you'd need very
>>> large amounts of RAM...
>>>
>>>
>> Why is that?  I'm not suggesting buffering it *all* but send it directly
>> when it is generated.
>>
>> I think there should be a SMALL amount of buffer for combining , and
>> compressing the data though.  Normally like 250-500MB per mapper but this
>> is when running say a 250GB job so this buffer is just to reduce IO sent to
>> the remote node.
>>
>> Also, currently, the shuffle is effected by the reduce task. This has two
>>> major benefits :
>>> # The 'pull' can only be initiated after the reduce is scheduled. The
>>> 'push' model would be hampered if the reduce hasn't been started.
>>>
>>
>> I've gone over this problem a number of times.  The way I'm handling it
>> is that ever map attempt is recorded and only successful maps actually have
>> their data reduced.  You end up having MORE intermediate data if machines
>> are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
>> clusters with lots of crashes this won't end up being a significant
>> percentage of the data.
>>
>>
>>> # The 'pull' is more resilient to failure of a single reduce. In the
>>> push model, it's harder to deal with a reduce
>>
>> failing after a push from the map.
>>>
>>>
>> I don't see how this would be the case ... I'm replicating all the
>> shuffle data ... so if a reducer crashes I just startup a new one.
>>
>> There IS the problem of whether we replicate the intermediate data from
>> the reducer but this can be a configuration option...
>>
>>
>>> Again, with MR2 we could experiment with push v/s pull where it makes
>>> sense (small jobs etc.). I'd love to mentor/help someone interested in
>>> putting cycles into it.
>>>
>>>
>> I'm going to be doing a ton of work in this area and I'll publish it if I
>> come across anything interesting.
>>
>>
>>
>


-- 
-- 

Founder/CEO Spinn3r.com <http://spinn3r.com/>

Location: *San Francisco, CA*
Skype: *burtonator*

Skype-in: *(415) 871-0687*

Re: Performance of direct vs indirect shuffling

Posted by Binglin Chang <de...@gmail.com>.
>
> One possible optimization I hope to look into next year is to change
> the map output code to push the data to the local TT, which would have
> configurable in-memory buffers.


Have someone ever considered a general data transfer service bundled with
YARN? So other applications(rather than MR) can also benefits from it.
The data transfer service looks like real world mail service, it has two
simple interface: register send receive (stream based)
Mapper & Recuder: Register(LocalMailService, Address(AppId,
MapperId/ReducerId)
Mapper: send(LocalMailService, from=Address(AppId, MapperId),
to=Address(AppId, ReducerId), data=xxx);
Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
to=Address(AppId, ReducerId));

LocalMailService manages a big buffer(configurable), so it can cache map
outputs or dump to disk if there is no memory;
LocalMailService can start transfering data from source to dest if both
addresses is registered;
If source & dest are in the same machine, there will be no network transfer;
To address can be multiple(broadcast or like mail group), this is useful
for 1:N data transfers(binary/side data distribution), the service can use
P2P for this kind of work(much better than -cacheFile)

Just an idea, if anyone interested.



On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton <bu...@gmail.com> wrote:

>
> We've discussed 'push' v/s 'pull' shuffle multiple times and each time
>> turned away due to complexities in MR1. With MRv2 (YARN) this would be much
>> more doable.
>>
>>
> Ah.... gotcha. This is what I expected as well.  It would be interesting
> to see a list of changes like this in MR1 vs MR2 to see what could
> POTENTIALLY happen in the future should everyone get spare time.
>
>
>> So, to really not do any disk i/o during the shuffle you'd need very
>> large amounts of RAM...
>>
>>
> Why is that?  I'm not suggesting buffering it *all* but send it directly
> when it is generated.
>
> I think there should be a SMALL amount of buffer for combining , and
> compressing the data though.  Normally like 250-500MB per mapper but this
> is when running say a 250GB job so this buffer is just to reduce IO sent to
> the remote node.
>
> Also, currently, the shuffle is effected by the reduce task. This has two
>> major benefits :
>> # The 'pull' can only be initiated after the reduce is scheduled. The
>> 'push' model would be hampered if the reduce hasn't been started.
>>
>
> I've gone over this problem a number of times.  The way I'm handling it is
> that ever map attempt is recorded and only successful maps actually have
> their data reduced.  You end up having MORE intermediate data if machines
> are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
> clusters with lots of crashes this won't end up being a significant
> percentage of the data.
>
>
>> # The 'pull' is more resilient to failure of a single reduce. In the push
>> model, it's harder to deal with a reduce
>
> failing after a push from the map.
>>
>>
> I don't see how this would be the case ... I'm replicating all the shuffle
> data ... so if a reducer crashes I just startup a new one.
>
> There IS the problem of whether we replicate the intermediate data from
> the reducer but this can be a configuration option...
>
>
>> Again, with MR2 we could experiment with push v/s pull where it makes
>> sense (small jobs etc.). I'd love to mentor/help someone interested in
>> putting cycles into it.
>>
>>
> I'm going to be doing a ton of work in this area and I'll publish it if I
> come across anything interesting.
>
>
>

Re: Performance of direct vs indirect shuffling

Posted by Kevin Burton <bu...@gmail.com>.
> We've discussed 'push' v/s 'pull' shuffle multiple times and each time
> turned away due to complexities in MR1. With MRv2 (YARN) this would be much
> more doable.
>
>
Ah.... gotcha. This is what I expected as well.  It would be interesting to
see a list of changes like this in MR1 vs MR2 to see what could POTENTIALLY
happen in the future should everyone get spare time.


> So, to really not do any disk i/o during the shuffle you'd need very large
> amounts of RAM...
>
>
Why is that?  I'm not suggesting buffering it *all* but send it directly
when it is generated.

I think there should be a SMALL amount of buffer for combining , and
compressing the data though.  Normally like 250-500MB per mapper but this
is when running say a 250GB job so this buffer is just to reduce IO sent to
the remote node.

Also, currently, the shuffle is effected by the reduce task. This has two
> major benefits :
> # The 'pull' can only be initiated after the reduce is scheduled. The
> 'push' model would be hampered if the reduce hasn't been started.
>

I've gone over this problem a number of times.  The way I'm handling it is
that ever map attempt is recorded and only successful maps actually have
their data reduced.  You end up having MORE intermediate data if machines
are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
clusters with lots of crashes this won't end up being a significant
percentage of the data.


> # The 'pull' is more resilient to failure of a single reduce. In the push
> model, it's harder to deal with a reduce

failing after a push from the map.
>
>
I don't see how this would be the case ... I'm replicating all the shuffle
data ... so if a reducer crashes I just startup a new one.

There IS the problem of whether we replicate the intermediate data from the
reducer but this can be a configuration option...


> Again, with MR2 we could experiment with push v/s pull where it makes
> sense (small jobs etc.). I'd love to mentor/help someone interested in
> putting cycles into it.
>
>
I'm going to be doing a ton of work in this area and I'll publish it if I
come across anything interesting.

Re: Performance of direct vs indirect shuffling

Posted by Arun C Murthy <ac...@hortonworks.com>.
On Dec 20, 2011, at 3:55 PM, Kevin Burton wrote:

> The current hadoop implementation shuffles directly to disk and then those disk files are eventually requested by the target nodes which are responsible for doing the reduce() on the intermediate data.
> 
> However, this requires more 2x IO than strictly necessary.
> 
> If the data were instead shuffled DIRECTLY to the target host, this IO overhead would be removed.
> 

We've discussed 'push' v/s 'pull' shuffle multiple times and each time turned away due to complexities in MR1. With MRv2 (YARN) this would be much more doable.

IAC...

A single reducer, in typical (well-designed?) applications, process multiple gigabytes of data across thousands of maps.

So, to really not do any disk i/o during the shuffle you'd need very large amounts of RAM...

Also, currently, the shuffle is effected by the reduce task. This has two major benefits :
# The 'pull' can only be initiated after the reduce is scheduled. The 'push' model would be hampered if the reduce hasn't been started.
# The 'pull' is more resilient to failure of a single reduce. In the push model, it's harder to deal with a reduce failing after a push from the map.

Again, with MR2 we could experiment with push v/s pull where it makes sense (small jobs etc.). I'd love to mentor/help someone interested in putting cycles into it.

Arun