You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Shai Erera <se...@gmail.com> on 2011/04/14 20:31:50 UTC

Best practices for jobs with large Map output

Hi

I'm running on Hadoop 0.20.2 and I have a job with the following nature:
* Mapper outputs very large records (50 to 200 MB)
* Reducer (single) merges all those records together
* Map output key is a constant (could be a NullWritable, but currently it's
a LongWritable(1))
* Reducer doesn't care about the keys at all

I'm running into OOMs at the Sort phase. Previously, when I was on Hadoop
0.20.1, I ran into OOMs at the Shuffle phase, but that was due to a bug that
was fixed in MAPREDUCE-1182. The OOMs I'm seeing now are from IFile, line
342 in readNextBlock, where it tries to allocate a byte[] the size of the
key+value output and fails. Inspecting the code, I noticed it depends on
io.file.buffer.size -- it allocates a buffer that large which it attempts to
reuse. However, if the map key+value output's size is large, it allocates
another byte[] of that size, which is devastating.

I'm running with mapred.child.java.opts set to -Xmx4096m, and each machine
in my cluster has 16 GB RAM, as well as it's the only job that's running (at
the moment), so there's definitely a lot of RAM to use.

So I limited the output of the Mappers to be of size <64MB, which is my
io.file.buffer.size and now the job succeeds, but I feel that it's not the
best way to deal with it. First, because someone could easily configure the
job I write (it's a library, so someone can do whatever he wants with it) to
have less io.file.buffer.size, or larger map output, and things will break
again. Second, because I think there must be a better way to deal with it.

Searching around I found several parameters that might be related, but not
sure if they'll help much. Of those are controlling the spill percent,
lowering the size of the largest record that can fit in-memory in the
shuffle phase etc. They feel like voodoo to me, but I may be wrong :).

It'd be best if I can avoid the shuffle and sort phase entirely - they are
not needed for this job. I wrote a *NoSorter implements IndexedSorter* in
hope that it will improve things, but it didn't (it replaces QuickSort by
not doing any sort).

Also, I don't understand why do the values of the Reducer need to be read
into memory, if they most likely (unless you really insist) don't
participate in the sort and merge phase, only the keys matter (again, unless
you provide a Comparator for the values). If it must load stuff into memory,
then loading the keys/values in small chunks of say 64K, is better because
the JVM can really struggle to allocate 100MB of consecutive byte[].

Another thing that would work great is if Hadoop just feed the Reducer as
soon as Mappers' outputs are ready. This job does not care about sorting +
merging the outputs, so I could gain a lot of perf back if the Reducer would
kick in immediately, and not only after all Mappers are done.

I couldn't measure it precisely, but of the 50 minutes it took the Reducer
to finish (Mappers finished in 6 minutes !), more than half was spent in the
copy, shuffle and sort phases, which is insane.

Any help / advise / best practice instructions are greatly appreciated. If
any of these are fixed in a newer Hadoop release I'd love to hear about it
too. I cannot upgrade from 0.20.2 at the moment, but it'd be nice to know
those are already addressed.

Shai

Re: Best practices for jobs with large Map output

Posted by Chris Douglas <cd...@apache.org>.
On Mon, Apr 18, 2011 at 3:42 AM, Shai Erera <se...@gmail.com> wrote:
> I ended up doing the following -- my HDFS Mapper creates an index in-memory
> and then serializes the in-memory index into a single file that is stored on
> HDFS (each Mapper serializes to a different file). I use FileSystem API to
> achieve that, so hopefully it's the way to do it. The Mapper outputs a Text
> value which is the location on HDFS. The Reducer then interprets that value
> and reads the file using FileSystem API, and deserialize it into an
> in-memory Lucene index.

Without knowing the format of a Lucene index, I can't say whether this
approach makes sense. Instead of handling the cleanup yourself, you
might consider running the index generation and the concat as separate
parts of your workflow (as Harsh suggested). -C

Re: Best practices for jobs with large Map output

Posted by Shai Erera <se...@gmail.com>.
Thanks for your response Chris !

An index, in this case a Lucene index, is a logical name for a collection of
files. Each Mapper instance generates such index from the input it receives,
and then the Reducer merges all of those indexes together, to output a
single Lucene index.

I ended up doing the following -- my HDFS Mapper creates an index in-memory
and then serializes the in-memory index into a single file that is stored on
HDFS (each Mapper serializes to a different file). I use FileSystem API to
achieve that, so hopefully it's the way to do it. The Mapper outputs a Text
value which is the location on HDFS. The Reducer then interprets that value
and reads the file using FileSystem API, and deserialize it into an
in-memory Lucene index.

I still need to write an OutputCommitter which will get rid of those
serialized-index-files (on HDFS) once the job completes successfully (e.g.
in cleanupJob).

Am I going in the right direction, or should I stop and rethink the
approach?

Shai

On Mon, Apr 18, 2011 at 10:02 AM, Chris Douglas <cd...@apache.org> wrote:

> I don't understand your job, but the Writable interface is just a
> format for record serialization. If your mapper generates
> <URI,offset,length> tuples referencing into data written in HDFS, that
> is sufficient to open the stream in the reducer using the FileSystem
> API. Writing an OutputFormat that interprets that tuple as a range of
> bytes to read from HDFS and write to a single stream should not
> diverge too far from the OutputFormats bundled with Hadoop. You might
> start there.
>
> Again, it's not clear what your goal is or what you mean by "index".
> Are the input records changed before being written by the reduce? Or
> is the purpose of this job only to concatenate index files? -C
>
> On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera <se...@gmail.com> wrote:
> > bq. If you can change your job to handle metadata backed by a store in
> HDFS
> > I have two Mappers, one that works with HDFS and one with GPFS. The GPFS
> one
> > does exactly that -- it stores the indexes in GPFS (which all Mappers and
> > Reducers see, as a shared location) and outputs just the pointer to that
> > location. Then, Hadoop just merges key=LongWritable and value=Text, and
> > indeed it works better (the job runs ~170% faster).
> > The value (index) is a collection of files, though my Writable writes
> them
> > as a single stream, so in essence I can make them look a single file.
> I've
> > never worked with HDFS before (only GPFS), and HDFS is a new requirement.
> > Can you point out some example code / classes I should use to achieve the
> > same trick? Will I need, in my Mapper, to specifically call FileSystem
> API
> > to store the index, or is a Writable enough?
> > Shai
> > On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas <cd...@apache.org>
> wrote:
> >>
> >> On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <ha...@cloudera.com> wrote:
> >> >> Is there absolutely no way to bypass the shuffle + sort phases? I
> don't
> >> >> mind
> >> >> writing some classes if that's what it takes ...
> >> >
> >> > Shuffle is an essential part of the Map to Reduce transition, it can't
> >> > be 'bypassed' since a Reducer has to fetch all map-outputs to begin
> >> > with. Sort/Group may be made dummy as you had done, but can't be
> >> > disabled altogether AFAIK. The latter has been bought up on the lists
> >> > before, if I remember right; but am not aware of an implementation
> >> > alongside that could do that (just begin reducing merely partitioned,
> >> > unsorted data).
> >>
> >> The sort also effects the partitioning, so completely disabling the
> >> sort (as above) will only work with 1 reducer.
> >>
> >> If only grouping is important, then a bijective f(key) that is
> >> inexpensive to sort is canonical. Though more efficient grouping
> >> methods are possible, in practice this captures most of the possible
> >> performance improvement.
> >>
> >> If neither sorting nor grouping are important, then a comparator that
> >> always asserts that its operands are equal will effect the
> >> partitioning, but each reducer will receive all its records in one
> >> iterator. Note also that the key portion of the record will be
> >> incorrect in the old API.
> >>
> >> However, as Harsh correctly points out, this doesn't appear to be the
> >> bottleneck in your job. The data motion for records of tens or
> >> hundreds of MB is patently inefficient, and OOMs are a regrettable but
> >> relatively minor consequence. If you can change your job to handle
> >> metadata backed by a store in HDFS, then your job can merge the
> >> indices instead of merging GB of record data. In other words, pass a
> >> reference to the record data and not the actual.
> >>
> >> If the job neither sorts nor groups, what is the format for the index?
> >> Instead of a reduce phase, a second, single-map job that concatenates
> >> the output of the first seems better fit (assuming the goal is a
> >> single file). -C
> >
> >
>

Re: Best practices for jobs with large Map output

Posted by Chris Douglas <cd...@apache.org>.
I don't understand your job, but the Writable interface is just a
format for record serialization. If your mapper generates
<URI,offset,length> tuples referencing into data written in HDFS, that
is sufficient to open the stream in the reducer using the FileSystem
API. Writing an OutputFormat that interprets that tuple as a range of
bytes to read from HDFS and write to a single stream should not
diverge too far from the OutputFormats bundled with Hadoop. You might
start there.

Again, it's not clear what your goal is or what you mean by "index".
Are the input records changed before being written by the reduce? Or
is the purpose of this job only to concatenate index files? -C

On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera <se...@gmail.com> wrote:
> bq. If you can change your job to handle metadata backed by a store in HDFS
> I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
> does exactly that -- it stores the indexes in GPFS (which all Mappers and
> Reducers see, as a shared location) and outputs just the pointer to that
> location. Then, Hadoop just merges key=LongWritable and value=Text, and
> indeed it works better (the job runs ~170% faster).
> The value (index) is a collection of files, though my Writable writes them
> as a single stream, so in essence I can make them look a single file. I've
> never worked with HDFS before (only GPFS), and HDFS is a new requirement.
> Can you point out some example code / classes I should use to achieve the
> same trick? Will I need, in my Mapper, to specifically call FileSystem API
> to store the index, or is a Writable enough?
> Shai
> On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas <cd...@apache.org> wrote:
>>
>> On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <ha...@cloudera.com> wrote:
>> >> Is there absolutely no way to bypass the shuffle + sort phases? I don't
>> >> mind
>> >> writing some classes if that's what it takes ...
>> >
>> > Shuffle is an essential part of the Map to Reduce transition, it can't
>> > be 'bypassed' since a Reducer has to fetch all map-outputs to begin
>> > with. Sort/Group may be made dummy as you had done, but can't be
>> > disabled altogether AFAIK. The latter has been bought up on the lists
>> > before, if I remember right; but am not aware of an implementation
>> > alongside that could do that (just begin reducing merely partitioned,
>> > unsorted data).
>>
>> The sort also effects the partitioning, so completely disabling the
>> sort (as above) will only work with 1 reducer.
>>
>> If only grouping is important, then a bijective f(key) that is
>> inexpensive to sort is canonical. Though more efficient grouping
>> methods are possible, in practice this captures most of the possible
>> performance improvement.
>>
>> If neither sorting nor grouping are important, then a comparator that
>> always asserts that its operands are equal will effect the
>> partitioning, but each reducer will receive all its records in one
>> iterator. Note also that the key portion of the record will be
>> incorrect in the old API.
>>
>> However, as Harsh correctly points out, this doesn't appear to be the
>> bottleneck in your job. The data motion for records of tens or
>> hundreds of MB is patently inefficient, and OOMs are a regrettable but
>> relatively minor consequence. If you can change your job to handle
>> metadata backed by a store in HDFS, then your job can merge the
>> indices instead of merging GB of record data. In other words, pass a
>> reference to the record data and not the actual.
>>
>> If the job neither sorts nor groups, what is the format for the index?
>> Instead of a reduce phase, a second, single-map job that concatenates
>> the output of the first seems better fit (assuming the goal is a
>> single file). -C
>
>

Re: Best practices for jobs with large Map output

Posted by Shai Erera <se...@gmail.com>.
bq. If you can change your job to handle metadata backed by a store in HDFS

I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
does exactly that -- it stores the indexes in GPFS (which all Mappers and
Reducers see, as a shared location) and outputs just the pointer to that
location. Then, Hadoop just merges key=LongWritable and value=Text, and
indeed it works better (the job runs ~170% faster).

The value (index) is a collection of files, though my Writable writes them
as a single stream, so in essence I can make them look a single file. I've
never worked with HDFS before (only GPFS), and HDFS is a new requirement.
Can you point out some example code / classes I should use to achieve the
same trick? Will I need, in my Mapper, to specifically call FileSystem API
to store the index, or is a Writable enough?

Shai

On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas <cd...@apache.org> wrote:

> On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <ha...@cloudera.com> wrote:
> >> Is there absolutely no way to bypass the shuffle + sort phases? I don't
> mind
> >> writing some classes if that's what it takes ...
> >
> > Shuffle is an essential part of the Map to Reduce transition, it can't
> > be 'bypassed' since a Reducer has to fetch all map-outputs to begin
> > with. Sort/Group may be made dummy as you had done, but can't be
> > disabled altogether AFAIK. The latter has been bought up on the lists
> > before, if I remember right; but am not aware of an implementation
> > alongside that could do that (just begin reducing merely partitioned,
> > unsorted data).
>
> The sort also effects the partitioning, so completely disabling the
> sort (as above) will only work with 1 reducer.
>
> If only grouping is important, then a bijective f(key) that is
> inexpensive to sort is canonical. Though more efficient grouping
> methods are possible, in practice this captures most of the possible
> performance improvement.
>
> If neither sorting nor grouping are important, then a comparator that
> always asserts that its operands are equal will effect the
> partitioning, but each reducer will receive all its records in one
> iterator. Note also that the key portion of the record will be
> incorrect in the old API.
>
> However, as Harsh correctly points out, this doesn't appear to be the
> bottleneck in your job. The data motion for records of tens or
> hundreds of MB is patently inefficient, and OOMs are a regrettable but
> relatively minor consequence. If you can change your job to handle
> metadata backed by a store in HDFS, then your job can merge the
> indices instead of merging GB of record data. In other words, pass a
> reference to the record data and not the actual.
>
> If the job neither sorts nor groups, what is the format for the index?
> Instead of a reduce phase, a second, single-map job that concatenates
> the output of the first seems better fit (assuming the goal is a
> single file). -C
>

Re: Best practices for jobs with large Map output

Posted by Chris Douglas <cd...@apache.org>.
On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <ha...@cloudera.com> wrote:
>> Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
>> writing some classes if that's what it takes ...
>
> Shuffle is an essential part of the Map to Reduce transition, it can't
> be 'bypassed' since a Reducer has to fetch all map-outputs to begin
> with. Sort/Group may be made dummy as you had done, but can't be
> disabled altogether AFAIK. The latter has been bought up on the lists
> before, if I remember right; but am not aware of an implementation
> alongside that could do that (just begin reducing merely partitioned,
> unsorted data).

The sort also effects the partitioning, so completely disabling the
sort (as above) will only work with 1 reducer.

If only grouping is important, then a bijective f(key) that is
inexpensive to sort is canonical. Though more efficient grouping
methods are possible, in practice this captures most of the possible
performance improvement.

If neither sorting nor grouping are important, then a comparator that
always asserts that its operands are equal will effect the
partitioning, but each reducer will receive all its records in one
iterator. Note also that the key portion of the record will be
incorrect in the old API.

However, as Harsh correctly points out, this doesn't appear to be the
bottleneck in your job. The data motion for records of tens or
hundreds of MB is patently inefficient, and OOMs are a regrettable but
relatively minor consequence. If you can change your job to handle
metadata backed by a store in HDFS, then your job can merge the
indices instead of merging GB of record data. In other words, pass a
reference to the record data and not the actual.

If the job neither sorts nor groups, what is the format for the index?
Instead of a reduce phase, a second, single-map job that concatenates
the output of the first seems better fit (assuming the goal is a
single file). -C

Re: Best practices for jobs with large Map output

Posted by Harsh J <ha...@cloudera.com>.
Hello Shai,

On Fri, Apr 15, 2011 at 5:45 PM, Shai Erera <se...@gmail.com> wrote:
> The job is an indexing job. Each Mapper emits a small index and the Reducer
> merges all of those indexes together. The Mappers output the index as a
> Writable which serializes it. I guess I could write the Reducer's function
> as a separate class as you suggest, but then I'll need to write a custom
> OutputFormat that will put those indexes on HDFS or somewhere?

I was thinking of a simple Java program that works with HDFS there,
not a Map/Reduce one (although you can tweak Map-only jobs a bit to
run a single mapper alone, which can then go ahead and do the same).
Your Mapper can open one out-file, get a list of all previous job's
output files, and perform the merge reading them one by one. This
would bypass using a Reduce phase.

> That complicates matters for me -- currently, when this job is run as part
> of a sequence of jobs, I can guarantee that if the job succeeds, then the
> indexes are successfully merged, and if it fails, the job should be
> restarted. While that can be achieved with a separate FS-using program as
> you suggest, it complicates matters.

I agree that the suggestion could complicate your workflow a bit.
Although, it is doable by Map-only job as I mentioned right above
(which may make it a bit more acceptable?).

> Is my scenario that extreme? Would you say the common scenario for Hadoop
> are jobs that output tiny objects between Mappers and Reducers?
>
> Would this work much better if I work w/ several Reducers? I'm not sure it
> will because the problem lies, IMO, in Hadoop allocating large consecutive
> chunks of RAM in my case, instead of trying to either stream it or break it
> down to smaller chunks.

Large outputs are alright, but I wouldn't say they are alright for
simple merging since it would all go through the sort phase with about
twice the I/O ultimately. Using multiple reducers can help a bit if
you do not mind partitioned results at the end.

> Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
> writing some classes if that's what it takes ...

Shuffle is an essential part of the Map to Reduce transition, it can't
be 'bypassed' since a Reducer has to fetch all map-outputs to begin
with. Sort/Group may be made dummy as you had done, but can't be
disabled altogether AFAIK. The latter has been bought up on the lists
before, if I remember right; but am not aware of an implementation
alongside that could do that (just begin reducing merely partitioned,
unsorted data).

-- 
Harsh J

Re: Best practices for jobs with large Map output

Posted by Shai Erera <se...@gmail.com>.
Thanks for the prompt response Harsh !

The job is an indexing job. Each Mapper emits a small index and the Reducer
merges all of those indexes together. The Mappers output the index as a
Writable which serializes it. I guess I could write the Reducer's function
as a separate class as you suggest, but then I'll need to write a custom
OutputFormat that will put those indexes on HDFS or somewhere?

That complicates matters for me -- currently, when this job is run as part
of a sequence of jobs, I can guarantee that if the job succeeds, then the
indexes are successfully merged, and if it fails, the job should be
restarted. While that can be achieved with a separate FS-using program as
you suggest, it complicates matters.

Is my scenario that extreme? Would you say the common scenario for Hadoop
are jobs that output tiny objects between Mappers and Reducers?

Would this work much better if I work w/ several Reducers? I'm not sure it
will because the problem lies, IMO, in Hadoop allocating large consecutive
chunks of RAM in my case, instead of trying to either stream it or break it
down to smaller chunks.

Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
writing some classes if that's what it takes ...

Shai

On Thu, Apr 14, 2011 at 9:50 PM, Harsh J <ha...@cloudera.com> wrote:

> Hello Shai,
>
> On Fri, Apr 15, 2011 at 12:01 AM, Shai Erera <se...@gmail.com> wrote:
> > Hi
> > I'm running on Hadoop 0.20.2 and I have a job with the following nature:
> > * Mapper outputs very large records (50 to 200 MB)
> > * Reducer (single) merges all those records together
> > * Map output key is a constant (could be a NullWritable, but currently
> it's
> > a LongWritable(1))
> > * Reducer doesn't care about the keys at all
>
> If I understand right, your single reducer's only work is to merge
> your multiple map's large record emits, and nothing else (It does not
> have 'keys' to worry about), correct?
>
> Why not do this with a normal FS-using program that opens a single
> file to write out map-materialized output files from a Map-only job to
> merge them?
>
> --
> Harsh J
>

Re: Best practices for jobs with large Map output

Posted by Harsh J <ha...@cloudera.com>.
Hello Shai,

On Fri, Apr 15, 2011 at 12:01 AM, Shai Erera <se...@gmail.com> wrote:
> Hi
> I'm running on Hadoop 0.20.2 and I have a job with the following nature:
> * Mapper outputs very large records (50 to 200 MB)
> * Reducer (single) merges all those records together
> * Map output key is a constant (could be a NullWritable, but currently it's
> a LongWritable(1))
> * Reducer doesn't care about the keys at all

If I understand right, your single reducer's only work is to merge
your multiple map's large record emits, and nothing else (It does not
have 'keys' to worry about), correct?

Why not do this with a normal FS-using program that opens a single
file to write out map-materialized output files from a Map-only job to
merge them?

-- 
Harsh J