You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Adam Kirsch <ad...@gmail.com> on 2010/02/21 22:35:06 UTC

question about spill/combine

Hi,

I'm new to hadoop and am confused about some serialization issues in
the spill.  My reference for this post is the following snippet from
the O'Reilly Hadoop book:

===================

Each map task has a circular memory buffer that it writes the output
to. The buffer is 100 MB by default, a size which can be tuned by
changing the io.sort.mb property. When the contents of the buffer
reaches a certain threshold size (io.sort.spill.percent, default 0.80,
or 80%) a background thread will start to spill the contents to disk.
Map outputs will continue to be written to the buffer while the spill
takes place, but if the buffer fills up during this time, the map will
block until the spill is complete. Spills are written in round-robin
fashion to the directories specified by the mapred.local.dir property,
in a job-specific subdirectory.

Before it writes to disk, the thread first divides the data into
partitions corresponding to the reducers that they will ultimately be
sent to. Within each partition, the background thread performs an
in-memory sort by key, and if there is a combiner function, it is run
on the output of the sort.

Each time the memory buffer reaches the spill threshold, a new spill
file is created, so after the map task has written its last output
record there could be several spill files. Before the task is
finished, the spill files are merged into a single partitioned and
sorted output file. The configuration property io.sort.factor controls
the maximum number of streams to merge at once; the default is 10.

If a combiner function has been specified, and the number of spills is
at least three (the value of the min.num.spills.for.combine property),
then the combiner is run before the output file is written. Recall
that combiners may be run repeatedly over the input without affecting
the final result. The point is that running combiners makes for a more
compact map output, so there is less data to write to local disk and
to transfer to the reducer.

=========================

The first time I read this, it made perfect sense to me, but then I
started thinking about whether combining or serialization happens
first.  From the text above, it looks like the circular memory buffer
used in the spill stores serialized data in some form, because
otherwise saying that, by default, the buffer is 100MB and the spill
is triggered when it is 80% full is not very meaningful.  (If it just
stored pointers to objects, how would hadoop know how much serialized
data that really represents?)  Of course, the deserialized (before
serialization) contents of the buffer must still be stored someplace,
in order to facilitate the in-memory sort by key.  After the sort is
run and the results are partitioned by key for the reducers, the
combiner may be run (depending on settings) as a kind of
application-specific compression mechanism.  The output of the
combiner is what is actually written to disk (or the compressed
output, if the compression option is used).  (And when the map job
runs, for each reducer, all the files corresponding to that reducer
are merged and the result is sent to the reducer.)

Here's my problem: it looks like all of the records that come out of
the map task are serialized in-memory before the combiner is even run,
and that if the combiner is being used aggressively (I am thinking
about writing jobs where the combiner is run on every spill, and the
value objects for the reducer are all fairly large), the results of
this serialization won't ever make it to disk.  Instead, the output of
the combiner (which runs on the deserialized [before serialization]
outputs from the map task) is ultimately what gets serialized to disk.

Thus, it looks like even if I use the combiner aggressively, I have to
pay for in-memory serialization of every record output by the map
function, which strikes me as very inefficient.  Now, maybe hadoop can
avoid serializing the contents of the map records by using some
heuristic to estimate the serialized size of the map function outputs
that have not yet been combined/written to disk in order to determine
when to trigger the spill, but I see no mention of that.  (It could
even feed an implementation of DataOutput to the Writable that doesn't
actually serialize, but instead just counts the number of bytes that
would be needed for the serialization; this would save a bunch of
memory operations, but it would still have the problem of doing all of
the computation and memory accesses in the Writables necessary for
serialization.)

Is this right?  The natural work-around for this is for me to
implement my own logic for combining in the mapper.  That is, rather
than just doing some small operations in the map function and passing
the information to the OutputCollector, I could keep state,
periodically do my own combining when that state gets to big, and then
only feed the results to the OutputCollector after that.  But that's
very nasty, because mappers are not supposed to accumulate state like
this; the built-in combining functionality is supposed to be used to
deal with exactly this issue.  Really, it seems that hadoop should
provide some way for a developer to configure triggering of the spill
in a more general way than serialized size.

This issue also begs the question of how hadoop does in-memory
serialization, and how well that performs for large objects, which I
will likely have.  I suppose it could use a linked list of byte arrays
of (possibly) increasing sizes or something like that, but if it does
something silly like use a single resizable vector of bytes, large
objects could require lots of extra copies.  (My guess is that hadoop
relies on some standard java utility from the network I/O package for
this, and that the end result is something fairly reasonable, but it
seems worth looking asking.)

Thanks very much,
Adam

Re: question about spill/combine

Posted by Adam Kirsch <ad...@gmail.com>.
Thanks, Matei.  I think what I will end up doing is doing some
"pre-combining" in my mapper class (storing a hashtable of key-value
pairs and using that to combine multiple values for the same key), and
then dumping that into the collect() when the number of keys bigger
than some threshold or the map task ends.  I expect that if I set the
threshold correctly, there won't be any spills (except for the one at
the end of the task).

As for the RawComparators, I'm trying to think of all the places where
they could helpful to get a sense of how things work.  Here's what
I've come up with:
- Merging together multiple spill files in the mapper before sending
to the reducer.  Since I am trying hard to avoid multiple spills, this
shouldn't apply.  Also if combining is going on here the key/value
pairs have to be deserialized anyway.

- Merging the map outputs together in the reducer before feeding to
the reducer.  However, here too everything has to be deserialized
eventually for the reducer, so it seems that there is only a win here
if this merge process requires intermediate results of the merge to be
written to disk.  It seems like in most cases there will only be one
round of merging (most people seem to set io.sort.factor to be pretty
big, so this is not a big deal).  But there is some merging that the
reducer tries to do in memory as map outputs are coming in, and I
suppose this would be helpful there.

- I originally though that when I call collect() in the mapper, even
though the record I pass is immediately serialized, the deserialized
key is still maintained for the shuffle, so the RawComparator is not
needed there.  But it seems that the mapper reuses the key object
between calls, so perhaps this is not the case, and if there is no
RawComparator defined for the key, then the in-memory sort requires
deserialization of keys that were only recently serialized.  Is this
correct?  If so, it is a little suspicious, and I'd be curious if
anyone knew why things were done this way.

Thanks again,
Adam

On Tue, Feb 23, 2010 at 11:24 AM, Matei Zaharia <ma...@eecs.berkeley.edu> wrote:
> Hi Adam,
>
> It looks like map output records are indeed serialized before being combined and written out. I'm not really sure why this is, except perhaps to simplify the code for the case where you don't know the size of the records. Maybe someone more familiar with this part of Hadoop can explain the design decision.
>
> Serialization in Hadoop is, however, customizable. Your key and value types need to implement the Writable interface (or actually WritableComparable for keys), which defines how they are serialized. I believe that it's also possible to speed up sorting by making your keys sortable through byte comparisons and setting the appropriate comparator through JobConf.setOutputKeyComparatorClass (MemcpyRawComparator and ReverseStringComparator could both work for example). The sort is actually the most expensive part of the map-side combine and spill process in many cases.
>
> Matei
>
>
> On Feb 21, 2010, at 1:35 PM, Adam Kirsch wrote:
>
>> Hi,
>>
>> I'm new to hadoop and am confused about some serialization issues in
>> the spill.  My reference for this post is the following snippet from
>> the O'Reilly Hadoop book:
>>
>> ===================
>>
>> Each map task has a circular memory buffer that it writes the output
>> to. The buffer is 100 MB by default, a size which can be tuned by
>> changing the io.sort.mb property. When the contents of the buffer
>> reaches a certain threshold size (io.sort.spill.percent, default 0.80,
>> or 80%) a background thread will start to spill the contents to disk.
>> Map outputs will continue to be written to the buffer while the spill
>> takes place, but if the buffer fills up during this time, the map will
>> block until the spill is complete. Spills are written in round-robin
>> fashion to the directories specified by the mapred.local.dir property,
>> in a job-specific subdirectory.
>>
>> Before it writes to disk, the thread first divides the data into
>> partitions corresponding to the reducers that they will ultimately be
>> sent to. Within each partition, the background thread performs an
>> in-memory sort by key, and if there is a combiner function, it is run
>> on the output of the sort.
>>
>> Each time the memory buffer reaches the spill threshold, a new spill
>> file is created, so after the map task has written its last output
>> record there could be several spill files. Before the task is
>> finished, the spill files are merged into a single partitioned and
>> sorted output file. The configuration property io.sort.factor controls
>> the maximum number of streams to merge at once; the default is 10.
>>
>> If a combiner function has been specified, and the number of spills is
>> at least three (the value of the min.num.spills.for.combine property),
>> then the combiner is run before the output file is written. Recall
>> that combiners may be run repeatedly over the input without affecting
>> the final result. The point is that running combiners makes for a more
>> compact map output, so there is less data to write to local disk and
>> to transfer to the reducer.
>>
>> =========================
>>
>> The first time I read this, it made perfect sense to me, but then I
>> started thinking about whether combining or serialization happens
>> first.  From the text above, it looks like the circular memory buffer
>> used in the spill stores serialized data in some form, because
>> otherwise saying that, by default, the buffer is 100MB and the spill
>> is triggered when it is 80% full is not very meaningful.  (If it just
>> stored pointers to objects, how would hadoop know how much serialized
>> data that really represents?)  Of course, the deserialized (before
>> serialization) contents of the buffer must still be stored someplace,
>> in order to facilitate the in-memory sort by key.  After the sort is
>> run and the results are partitioned by key for the reducers, the
>> combiner may be run (depending on settings) as a kind of
>> application-specific compression mechanism.  The output of the
>> combiner is what is actually written to disk (or the compressed
>> output, if the compression option is used).  (And when the map job
>> runs, for each reducer, all the files corresponding to that reducer
>> are merged and the result is sent to the reducer.)
>>
>> Here's my problem: it looks like all of the records that come out of
>> the map task are serialized in-memory before the combiner is even run,
>> and that if the combiner is being used aggressively (I am thinking
>> about writing jobs where the combiner is run on every spill, and the
>> value objects for the reducer are all fairly large), the results of
>> this serialization won't ever make it to disk.  Instead, the output of
>> the combiner (which runs on the deserialized [before serialization]
>> outputs from the map task) is ultimately what gets serialized to disk.
>>
>> Thus, it looks like even if I use the combiner aggressively, I have to
>> pay for in-memory serialization of every record output by the map
>> function, which strikes me as very inefficient.  Now, maybe hadoop can
>> avoid serializing the contents of the map records by using some
>> heuristic to estimate the serialized size of the map function outputs
>> that have not yet been combined/written to disk in order to determine
>> when to trigger the spill, but I see no mention of that.  (It could
>> even feed an implementation of DataOutput to the Writable that doesn't
>> actually serialize, but instead just counts the number of bytes that
>> would be needed for the serialization; this would save a bunch of
>> memory operations, but it would still have the problem of doing all of
>> the computation and memory accesses in the Writables necessary for
>> serialization.)
>>
>> Is this right?  The natural work-around for this is for me to
>> implement my own logic for combining in the mapper.  That is, rather
>> than just doing some small operations in the map function and passing
>> the information to the OutputCollector, I could keep state,
>> periodically do my own combining when that state gets to big, and then
>> only feed the results to the OutputCollector after that.  But that's
>> very nasty, because mappers are not supposed to accumulate state like
>> this; the built-in combining functionality is supposed to be used to
>> deal with exactly this issue.  Really, it seems that hadoop should
>> provide some way for a developer to configure triggering of the spill
>> in a more general way than serialized size.
>>
>> This issue also begs the question of how hadoop does in-memory
>> serialization, and how well that performs for large objects, which I
>> will likely have.  I suppose it could use a linked list of byte arrays
>> of (possibly) increasing sizes or something like that, but if it does
>> something silly like use a single resizable vector of bytes, large
>> objects could require lots of extra copies.  (My guess is that hadoop
>> relies on some standard java utility from the network I/O package for
>> this, and that the end result is something fairly reasonable, but it
>> seems worth looking asking.)
>>
>> Thanks very much,
>> Adam
>
>

Re: question about spill/combine

Posted by Matei Zaharia <ma...@eecs.berkeley.edu>.
Hi Adam,

It looks like map output records are indeed serialized before being combined and written out. I'm not really sure why this is, except perhaps to simplify the code for the case where you don't know the size of the records. Maybe someone more familiar with this part of Hadoop can explain the design decision.

Serialization in Hadoop is, however, customizable. Your key and value types need to implement the Writable interface (or actually WritableComparable for keys), which defines how they are serialized. I believe that it's also possible to speed up sorting by making your keys sortable through byte comparisons and setting the appropriate comparator through JobConf.setOutputKeyComparatorClass (MemcpyRawComparator and ReverseStringComparator could both work for example). The sort is actually the most expensive part of the map-side combine and spill process in many cases.

Matei


On Feb 21, 2010, at 1:35 PM, Adam Kirsch wrote:

> Hi,
> 
> I'm new to hadoop and am confused about some serialization issues in
> the spill.  My reference for this post is the following snippet from
> the O'Reilly Hadoop book:
> 
> ===================
> 
> Each map task has a circular memory buffer that it writes the output
> to. The buffer is 100 MB by default, a size which can be tuned by
> changing the io.sort.mb property. When the contents of the buffer
> reaches a certain threshold size (io.sort.spill.percent, default 0.80,
> or 80%) a background thread will start to spill the contents to disk.
> Map outputs will continue to be written to the buffer while the spill
> takes place, but if the buffer fills up during this time, the map will
> block until the spill is complete. Spills are written in round-robin
> fashion to the directories specified by the mapred.local.dir property,
> in a job-specific subdirectory.
> 
> Before it writes to disk, the thread first divides the data into
> partitions corresponding to the reducers that they will ultimately be
> sent to. Within each partition, the background thread performs an
> in-memory sort by key, and if there is a combiner function, it is run
> on the output of the sort.
> 
> Each time the memory buffer reaches the spill threshold, a new spill
> file is created, so after the map task has written its last output
> record there could be several spill files. Before the task is
> finished, the spill files are merged into a single partitioned and
> sorted output file. The configuration property io.sort.factor controls
> the maximum number of streams to merge at once; the default is 10.
> 
> If a combiner function has been specified, and the number of spills is
> at least three (the value of the min.num.spills.for.combine property),
> then the combiner is run before the output file is written. Recall
> that combiners may be run repeatedly over the input without affecting
> the final result. The point is that running combiners makes for a more
> compact map output, so there is less data to write to local disk and
> to transfer to the reducer.
> 
> =========================
> 
> The first time I read this, it made perfect sense to me, but then I
> started thinking about whether combining or serialization happens
> first.  From the text above, it looks like the circular memory buffer
> used in the spill stores serialized data in some form, because
> otherwise saying that, by default, the buffer is 100MB and the spill
> is triggered when it is 80% full is not very meaningful.  (If it just
> stored pointers to objects, how would hadoop know how much serialized
> data that really represents?)  Of course, the deserialized (before
> serialization) contents of the buffer must still be stored someplace,
> in order to facilitate the in-memory sort by key.  After the sort is
> run and the results are partitioned by key for the reducers, the
> combiner may be run (depending on settings) as a kind of
> application-specific compression mechanism.  The output of the
> combiner is what is actually written to disk (or the compressed
> output, if the compression option is used).  (And when the map job
> runs, for each reducer, all the files corresponding to that reducer
> are merged and the result is sent to the reducer.)
> 
> Here's my problem: it looks like all of the records that come out of
> the map task are serialized in-memory before the combiner is even run,
> and that if the combiner is being used aggressively (I am thinking
> about writing jobs where the combiner is run on every spill, and the
> value objects for the reducer are all fairly large), the results of
> this serialization won't ever make it to disk.  Instead, the output of
> the combiner (which runs on the deserialized [before serialization]
> outputs from the map task) is ultimately what gets serialized to disk.
> 
> Thus, it looks like even if I use the combiner aggressively, I have to
> pay for in-memory serialization of every record output by the map
> function, which strikes me as very inefficient.  Now, maybe hadoop can
> avoid serializing the contents of the map records by using some
> heuristic to estimate the serialized size of the map function outputs
> that have not yet been combined/written to disk in order to determine
> when to trigger the spill, but I see no mention of that.  (It could
> even feed an implementation of DataOutput to the Writable that doesn't
> actually serialize, but instead just counts the number of bytes that
> would be needed for the serialization; this would save a bunch of
> memory operations, but it would still have the problem of doing all of
> the computation and memory accesses in the Writables necessary for
> serialization.)
> 
> Is this right?  The natural work-around for this is for me to
> implement my own logic for combining in the mapper.  That is, rather
> than just doing some small operations in the map function and passing
> the information to the OutputCollector, I could keep state,
> periodically do my own combining when that state gets to big, and then
> only feed the results to the OutputCollector after that.  But that's
> very nasty, because mappers are not supposed to accumulate state like
> this; the built-in combining functionality is supposed to be used to
> deal with exactly this issue.  Really, it seems that hadoop should
> provide some way for a developer to configure triggering of the spill
> in a more general way than serialized size.
> 
> This issue also begs the question of how hadoop does in-memory
> serialization, and how well that performs for large objects, which I
> will likely have.  I suppose it could use a linked list of byte arrays
> of (possibly) increasing sizes or something like that, but if it does
> something silly like use a single resizable vector of bytes, large
> objects could require lots of extra copies.  (My guess is that hadoop
> relies on some standard java utility from the network I/O package for
> this, and that the end result is something fairly reasonable, but it
> seems worth looking asking.)
> 
> Thanks very much,
> Adam