You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Elliott Clark <ec...@ngmoco.com> on 2011/10/20 03:20:18 UTC

Reduce Side Combiner

When running a map reduce job using avro mapred we're having some issues
with combiners.

When running over a small data set map side combiners run and report that
they combined records.
When running over a larger data set combiners run and report that they
combined 1.4 Billion records into 6 million.  However the reduce phase fails
with:

2011-10-19 21:37:34,777 WARN org.apache.hadoop.mapred.ReduceTask:
attempt_201109220009_0156_r_000000_0 Merge of the inmemory files threw
an exception: java.io.IOException: Intermediate merge failed
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: org.apache.avro.AvroRuntimeException: No field named rowKey in: null
	at org.apache.avro.reflect.ReflectData.findField(ReflectData.java:194)
	at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:179)
	at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:96)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:102)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:65)
	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:102)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
	at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:131)
	at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:114)
	at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:179)
	at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1025)
	at org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:52)
	at org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:40)
	at com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:63)
	at com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:17)
	at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:61)
	at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:30)
	at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1296)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2701)
	... 1 more



rowKey is only present in our output schema.  In looking at the code it
looks like the combiner is using the wrong collector.

Commenting out the Combiner means that everything works well.  Running over
a smaller dataset results in the job running well.  Basically anything that
means that https://issues.apache.org/jira/browse/HADOOP-3226 doesn't run
means that the job works.

Any ideas on how to either fix this?  The above patch to hadoop was
committed to trunk without any additional tests so I'm not really sure how
to get this to repro on a small non-distributed scale for a unit test.

Re: Reduce Side Combiner

Posted by Doug Cutting <cu...@apache.org>.
I've filed a Jira and posted a patch:

https://issues.apache.org/jira/browse/AVRO-944

Can you please tell me whether this patch fixes things for you?

Thanks,

Doug

On 10/19/2011 06:20 PM, Elliott Clark wrote:
> When running a map reduce job using avro mapred we're having some issues
> with combiners.
> 
> When running over a small data set map side combiners run and report
> that they combined records.
> When running over a larger data set combiners run and report that they
> combined 1.4 Billion records into 6 million.  However the reduce phase
> fails with:
> 
> 2011-10-19 21:37:34,777 WARN org.apache.hadoop.mapred.ReduceTask: attempt_201109220009_0156_r_000000_0 Merge of the inmemory files threw an exception: java.io.IOException: Intermediate merge failed
> 	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
> 	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
> Caused by: org.apache.avro.AvroRuntimeException: No field named rowKey in: null
> 	at org.apache.avro.reflect.ReflectData.findField(ReflectData.java:194)
> 	at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:179)
> 	at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:96)
> 	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:102)
> 	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:65)
> 	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:102)
> 	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
> 	at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:131)
> 	at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:114)
> 	at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:179)
> 	at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1025)
> 	at org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:52)
> 	at org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:40)
> 	at com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:63)
> 	at com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:17)
> 	at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:61)
> 	at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:30)
> 	at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1296)
> 	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2701)
> 	... 1 more
> 
> 
> 
> rowKey is only present in our output schema.  In looking at the code it
> looks like the combiner is using the wrong collector.
> 
> Commenting out the Combiner means that everything works well.  Running
> over a smaller dataset results in the job running well.  Basically
> anything that means that
> https://issues.apache.org/jira/browse/HADOOP-3226 doesn't run means that
> the job works.
> 
> Any ideas on how to either fix this?  The above patch to hadoop was
> committed to trunk without any additional tests so I'm not really sure
> how to get this to repro on a small non-distributed scale for a unit test.