You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Tahir Hameed <ta...@gmail.com> on 2015/09/24 15:46:34 UTC

groupByKey with Avro

Hi,

I've the following case :

PGroupedTable<ImmutableBytesWritable, ABC> o = abcTable
                .parallelDo(new ABCDoFN(),
Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
Avros.reflects(ABC.class)))
                .groupByKey();

This raises the following error:

2015-09-24 15:43:08,625 WARN [main]
org.apache.hadoop.mapred.YarnChild: Exception running child :
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"bytes","java-class":"[B"}]: [B@3195e45d
	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:604)
	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
	at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
	at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
	at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1146)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
	at org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
	at org.apache.crunch.MapFn.process(MapFn.java:34)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
	at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
	at com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:36)
	at com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:17)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
	at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
	at org.apache.crunch.MapFn.process(MapFn.java:34)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
	at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)



The error above is only caused by the groupByKey() method. I am not sure
why this is happening. Can someone point me in the right direction?


Tahir

Re: groupByKey with Avro

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Tahir,

I think that the issue here is that Avro reflection isn't up to the
task of encoding/decoding an ImmutableBytesWritable instance.

The PTableType that you supply
(Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
Avros.reflects(ABC.class))) specifies how data is to be serialized
between the map and reduce phase, or between multiple mapreduce jobs.
However, in order to use Avros.reflects, the class being serialized
and deserialized needs to have all of its fields accessible via
reflection (typically via getters and setters).

I'm guessing that when you're running this in a unit test, you're
using the MemPipeline. The MemPipeline doesn't do any actual
serialization of values, which is why this wouldn't be an issue there.

Probably the easiest way to get around this is to use the
Avros.writables [1] method to define the PType for the
ImmutableBytesWritable class. This would mean that your PTableType
would be defined as follows:

    Avros.tableOf(Avros.writables(ImmutableBytesWritable.class),
Avros.reflects(ABC.class))

Depending on what ABC is (whether or not it's really serializable via
reflection) you could also consider using the WritableTypeFamily (via
the Writables [2] class) instead of the AvroTypeFamily (via Avros).

- Gabriel


1. https://crunch.apache.org/apidocs/0.12.0/org/apache/crunch/types/avro/Avros.html#writables(java.lang.Class)
2. https://crunch.apache.org/apidocs/0.12.0/org/apache/crunch/types/writable/Writables.html

On Thu, Sep 24, 2015 at 3:56 PM, Tahir Hameed <ta...@gmail.com> wrote:
> To add to the above, my unit tests seems to be running fine without any
> Exception, but when I run the above lines of code on the cluster, I receive
> the above error.
>
>
> On Thu, Sep 24, 2015 at 3:46 PM, Tahir Hameed <ta...@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> I've the following case :
>>
>> PGroupedTable<ImmutableBytesWritable, ABC> o = abcTable
>>                 .parallelDo(new ABCDoFN(),
>> Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
>> Avros.reflects(ABC.class)))
>>                 .groupByKey();
>>
>> This raises the following error:
>>
>> 2015-09-24 15:43:08,625 WARN [main] org.apache.hadoop.mapred.YarnChild:
>> Exception running child : org.apache.avro.UnresolvedUnionException: Not in
>> union ["null",{"type":"bytes","java-class":"[B"}]: [B@3195e45d
>> 	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:604)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> 	at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
>> 	at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
>> 	at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1146)
>> 	at
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
>> 	at
>> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
>> 	at
>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
>> 	at
>> org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
>> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> 	at
>> com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:36)
>> 	at
>> com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:17)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
>> 	at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>> 	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
>> 	at java.security.AccessController.doPrivileged(Native Method)
>> 	at javax.security.auth.Subject.doAs(Subject.java:415)
>> 	at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>> 	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
>>
>>
>>
>> The error above is only caused by the groupByKey() method. I am not sure
>> why this is happening. Can someone point me in the right direction?
>>
>>
>> Tahir
>
>

Re: groupByKey with Avro

Posted by Tahir Hameed <ta...@gmail.com>.
To add to the above, my unit tests seems to be running fine without any
Exception, but when I run the above lines of code on the cluster, I receive
the above error.


On Thu, Sep 24, 2015 at 3:46 PM, Tahir Hameed <ta...@gmail.com> wrote:

>
> Hi,
>
> I've the following case :
>
> PGroupedTable<ImmutableBytesWritable, ABC> o = abcTable
>                 .parallelDo(new ABCDoFN(),
> Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
> Avros.reflects(ABC.class)))
>                 .groupByKey();
>
> This raises the following error:
>
> 2015-09-24 15:43:08,625 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"bytes","java-class":"[B"}]: [B@3195e45d
> 	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:604)
> 	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
> 	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> 	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> 	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> 	at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> 	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> 	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> 	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> 	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> 	at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
> 	at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
> 	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1146)
> 	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
> 	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
> 	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
> 	at org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> 	at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> 	at com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:36)
> 	at com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:17)
> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> 	at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
> 	at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
> 	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> 	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
>
>
>
> The error above is only caused by the groupByKey() method. I am not sure
> why this is happening. Can someone point me in the right direction?
>
>
> Tahir
>