You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Karl Kuntz <kk...@tradebotsystems.com> on 2010/05/10 17:48:45 UTC

sorting to a single output

Hi all,

I'm doing some evaluation using a vanilla 20.2 release on a small
cluster to sort large data sets.  I've looked at the terasort work, but
in my particular case I'm more interested in outputting a single file,
than I am in performance.  For testing I'm sorting about 200G worth of
data, and attempting to generate one output file.  It seems there are
two immediately apparent approaches:

1) Set number of reduces to 1
2) Do a final concatenation after the MR job (with total order
partitioning) finishes.


Is it reasonable to assume that a single reduce of 200G is reasonable? I
would think that since a merge sort is used it should be, though
obviously not a very good use of cluster resources.  Currently my
testing indicates this is not the case - as the memory required appears
to scale with the size of the output.  When a single reduction output is
~10G (>20 tasks) I need 1280M of task memory to not generate exceptions.
If I up the memory per task to 1536M I can get to ~14G per reduce (>15
reduce tasks).   The exceptions appear to be generated while in memory
sorting during the copy phase, see exception at bottom of mail. We could
model this out, and just up our memory as needed, but at some point our
data sets may still outgrow the available memory.  Also, is this
behavior a bug, or just a known limitation of reduce?

So if a single reduce isn't reasonable (for whatever reason), what's the
best approach to merging multiple outputs such as those in the terasort
test?  

I suppose at some point we may just have to live with multiple files,
but I'd like to avoid that if possible.

Thanks for any input

-Karl.


java.io.IOException: Task: attempt_201005100827_0001_r_000000_0 - The
reduce copier failed
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
	at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.io.IOException: java.lang.RuntimeException:
java.io.EOFException
	at
org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:
103)
	at
org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
	at
org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136)
	at
org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
	at
org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.ja
va:335)
	at
org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
	at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
	at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$LocalFSMerger.run(Reduc
eTask.java:2454)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readByte(DataInputStream.java:250)
	at
org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298)
	at
org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319)
	at org.apache.hadoop.io.Text.readFields(Text.java:263)
	at
com.tradebotsystems.dfs.io.DelimitedText.readFields(DelimitedText.java:2
21)
	at
org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:
97)
	... 7 more

	at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$LocalFSMerger.run(Reduc
eTask.java:2458)


Re: sorting to a single output

Posted by Alex Kozlov <al...@cloudera.com>.
Hi Karl,

Even though approach 1 is possible, it's not scalable.  As far as I know
Hadoop reducer will run out of memory if you merge big files (I am not sure
it's a 'bug' or a 'limitation', but it was designed this way).  In practice,
you are likely to run into other problems like accessibility and maintenance
if you output into a single huge file.  My rule of thumb that each file
should be 4-5 HDFS blocks.  There is practically no overhead in this
approach.

You can write a simple Java program to read/merge multiple files or use
FsShell 'hadoop fs -getmerge'.  'Hadoop fs -cat' and 'hadoop fs -text' will
also work with multiple files (the first will not work if the file is
compressed).

Alex K

On Mon, May 10, 2010 at 8:48 AM, Karl Kuntz <kk...@tradebotsystems.com>wrote:

> Hi all,
>
> I'm doing some evaluation using a vanilla 20.2 release on a small
> cluster to sort large data sets.  I've looked at the terasort work, but
> in my particular case I'm more interested in outputting a single file,
> than I am in performance.  For testing I'm sorting about 200G worth of
> data, and attempting to generate one output file.  It seems there are
> two immediately apparent approaches:
>
> 1) Set number of reduces to 1
> 2) Do a final concatenation after the MR job (with total order
> partitioning) finishes.
>
>
> Is it reasonable to assume that a single reduce of 200G is reasonable? I
> would think that since a merge sort is used it should be, though
> obviously not a very good use of cluster resources.  Currently my
> testing indicates this is not the case - as the memory required appears
> to scale with the size of the output.  When a single reduction output is
> ~10G (>20 tasks) I need 1280M of task memory to not generate exceptions.
> If I up the memory per task to 1536M I can get to ~14G per reduce (>15
> reduce tasks).   The exceptions appear to be generated while in memory
> sorting during the copy phase, see exception at bottom of mail. We could
> model this out, and just up our memory as needed, but at some point our
> data sets may still outgrow the available memory.  Also, is this
> behavior a bug, or just a known limitation of reduce?
>
> So if a single reduce isn't reasonable (for whatever reason), what's the
> best approach to merging multiple outputs such as those in the terasort
> test?
>
> I suppose at some point we may just have to live with multiple files,
> but I'd like to avoid that if possible.
>
> Thanks for any input
>
> -Karl.
>
>
> java.io.IOException: Task: attempt_201005100827_0001_r_000000_0 - The
> reduce copier failed
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
>        at org.apache.hadoop.mapred.Child.main(Child.java:170)
> Caused by: java.io.IOException: java.lang.RuntimeException:
> java.io.EOFException
>        at
> org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:
> 103)
>        at
> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
>        at
> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136)
>        at
> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
>        at
> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.ja
> va:335)
>        at
> org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
>        at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$LocalFSMerger.run(Reduc
> eTask.java:2454)
> Caused by: java.io.EOFException
>        at java.io.DataInputStream.readByte(DataInputStream.java:250)
>        at
> org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298)
>        at
> org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319)
>        at org.apache.hadoop.io.Text.readFields(Text.java:263)
>        at
> com.tradebotsystems.dfs.io.DelimitedText.readFields(DelimitedText.java:2
> 21)
>        at
> org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:
> 97)
>        ... 7 more
>
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$LocalFSMerger.run(Reduc
> eTask.java:2458)
>
>