You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Jonathan Natkins <na...@cloudera.com> on 2012/12/07 02:14:00 UTC

Looking for some guidance in building a basic Avro pipeline

So I've been futzing with Crunch a bit, and trying to understand how to
build a pipeline that outputs Avro data files. Roughly, I'm doing something
along these lines:

    Schema.Parser schemaParser = new Schema.Parser();
    final Schema avroObjSchema = schemaParser.parse(
schemaJsonString);

    AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
        avroObjSchema, new
AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
        MyAvroObject.class, avroObjSchema));

    PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
MyAvroObject>() {
      public void process(String line, Emitter<MyAvroObject> emitter) {
        emitter.emit(convertStringToAvroObj(line));
      }
    }, avroType);

However, this results in a class cast exception:

Exception in thread "main" java.lang.ClassCastException: class
com.company.MyAvroObject
    at java.lang.Class.asSubclass(Class.java:3039)
    at
org.apache.crunch.types.writable.Writables.records(Writables.java:250)
    at
org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
    at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
    at org.apache.crunch.types.writable.WritableTypeFamily.as
(WritableTypeFamily.java:135)
    at
org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)

Anybody have any thoughts? There's got to be a magical incantation that I
have slightly off.

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Josh Wills <jw...@cloudera.com>.
Thanks. I think the core issue here is that there is an assumption in
MapReduce (and in Crunch) that once the Iterable<V> are processed in a
reduce step, they are now gone, and cannot be processed again, which is why
passing along the PType for the grouped table type didn't make sense as
part of the processing pipeline-- you couldn't serialize the Iterable<V> to
disk, and so it was only possible to process the output of a groupByKey
operation once. Updating that so that the Iterable<V> could be processed
multiple times in a single job necessitates including spillable collections.

J


On Tue, Dec 11, 2012 at 1:26 PM, Jonathan Natkins <na...@cloudera.com>wrote:

> Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129
>
>
>
> On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> No, you're not-- I think that's a bug. Switching to "parallelDo" instead
>> of "by" on the groupedData object will work fine, but we should make sure
>> the by() operation works on grouped tables.
>>
>>
>> On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>
>>> Alright, I'm back for more. This time, I'm trying to perform a group by
>>> with Avro data. What I've currently got is this:
>>>
>>>     PGroupedTable<String, MyAvroObject> processedData =
>>> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>>>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
>>> emitter) {
>>>         String key = getKey(line);
>>>         MyAvroObject value = convertToAvroObject(line);
>>>         emitter.emit(Pair.of(key, value));
>>>       }
>>>     }, Avros.tableOf(Avros.strings(),
>>> Avros.specifics(MyAvroObject.class)))
>>>     .groupByKey(3);
>>>
>>>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>>
>>> groupedData =
>>>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
>>> MyAvroGroup>() {
>>>             @Override
>>>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
>>> input) {
>>>               MyAvroGroup group = new MyAvroGroup();
>>>               group.objects = Lists.<MyAvroObject>newArrayList();
>>>
>>>               for (MyAvroObject obj : input.second()) {
>>>                 group.objects.add(obj);
>>>               }
>>>
>>>               return group;
>>>             }
>>>           },
>>>           Avros.specifics(MyAvroGroup.class));
>>>
>>> I think this is all pretty sane, but I'm getting an exception when the
>>> pipeline attempts to run the by():
>>>
>>> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
>>> org.apache.crunch.types.avro.AvroType
>>>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>>>     at
>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>>>     at org.apache.crunch.impl.mem.collect.MemCollection.by
>>> (MemCollection.java:222)
>>>
>>> Am I doing something obviously wrong?
>>>
>>> Thanks,
>>> Natty
>>>
>>>
>>>
>>>
>>> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>
>>>> To bring things full circle, the core issue I was having was caused by
>>>> the fact that I was writing the data in the wrong way. Instead of
>>>>
>>>> pipeline.writeTextFile(words, args[1]);
>>>>
>>>> I should have been using
>>>>
>>>> pipeline.write(words, To.avroFile(args[1]);
>>>>
>>>> As Josh noted, writeTextFile was attempting to write my data out as a
>>>> String, but I wasn't giving it an object that was easy to turn into a
>>>> String, which resulted in an exception. Changing it to write to an avro
>>>> file solved those issues.
>>>>
>>>> Thanks, Josh!
>>>>
>>>>
>>>>
>>>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jw...@cloudera.com>wrote:
>>>>
>>>>> Hey Natty,
>>>>>
>>>>> Reply inlined.
>>>>>
>>>>>
>>>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <natty@cloudera.com
>>>>> > wrote:
>>>>>
>>>>>> Hey Josh,
>>>>>>
>>>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>>>>> have. I can certainly go through the trouble of getting that file, but what
>>>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>>>> provided by the Avros utility class, it should be supported. So here's my
>>>>>> question:
>>>>>>
>>>>>
>>>>> Interesting-- I had not hit that use case for Avro before. For a POJO,
>>>>> I would just use the reflection APIs, which are available via
>>>>> Avros.reflects.
>>>>>
>>>>>
>>>>>>
>>>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>>>> PType<T> has to implement Writable, and in the case of the return type of
>>>>>> Avros.generics, this is not the case.
>>>>>>
>>>>>
>>>>> There's no requirement for the PType<T> to be a Writable, or even an
>>>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>>>
>>>>> I'm just taking a closer look at the Exception that was thrown, and it
>>>>> looks to me like the problem is occurring at the end of the pipeline, where
>>>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>>>> posted). Crunch has to convert the PType to something that can be converted
>>>>> to a Writable impl-- if you try to write an Avro object to the
>>>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks
>>>>> to me that in this case, Crunch can't figure out how to turn MyAvroObject
>>>>> into a Writable instance for writing to the TextOuputFormat.
>>>>>
>>>>>
>>>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>>>> necessary hoops exist.
>>>>>>
>>>>>
>>>>> One way to fix this would be to update writeTextFile to force
>>>>> conversion of any non-string that was passed into it into a String via an
>>>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place.
>>>>> What do you think?
>>>>>
>>>>>
>>>>>> Thanks,
>>>>>> Natty
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>>
>>>>>>> Did you look at Avros.specifics?
>>>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>>>> and then I modify my code to use GenericData.Records. Those Records still
>>>>>>>> don't implement the Writable interface, so I'm still getting a class cast
>>>>>>>> exception. Did I do something totally wrong?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <
>>>>>>>> natty@cloudera.com> wrote:
>>>>>>>>
>>>>>>>>> Well, the problem with that is that I really want to work with my
>>>>>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>>>>>
>>>>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>>>>>> object.
>>>>>>>>>>
>>>>>>>>>> Interesting though, I would still want that case to work
>>>>>>>>>> correctly.
>>>>>>>>>>
>>>>>>>>>> Josh
>>>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>>>>>>> something along these lines:
>>>>>>>>>>>
>>>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>>>> schemaJsonString);
>>>>>>>>>>>
>>>>>>>>>>>     AvroType avroType = new
>>>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>>>         avroObjSchema, new
>>>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>>>
>>>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>>>> emitter) {
>>>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>>>       }
>>>>>>>>>>>     }, avroType);
>>>>>>>>>>>
>>>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>>>
>>>>>>>>>>> Anybody have any thoughts? There's got to be a magical
>>>>>>>>>>> incantation that I have slightly off.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129


On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <jw...@cloudera.com> wrote:

> No, you're not-- I think that's a bug. Switching to "parallelDo" instead
> of "by" on the groupedData object will work fine, but we should make sure
> the by() operation works on grouped tables.
>
>
> On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>
>> Alright, I'm back for more. This time, I'm trying to perform a group by
>> with Avro data. What I've currently got is this:
>>
>>     PGroupedTable<String, MyAvroObject> processedData =
>> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
>> emitter) {
>>         String key = getKey(line);
>>         MyAvroObject value = convertToAvroObject(line);
>>         emitter.emit(Pair.of(key, value));
>>       }
>>     }, Avros.tableOf(Avros.strings(),
>> Avros.specifics(MyAvroObject.class)))
>>     .groupByKey(3);
>>
>>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData
>> =
>>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
>> MyAvroGroup>() {
>>             @Override
>>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
>> input) {
>>               MyAvroGroup group = new MyAvroGroup();
>>               group.objects = Lists.<MyAvroObject>newArrayList();
>>
>>               for (MyAvroObject obj : input.second()) {
>>                 group.objects.add(obj);
>>               }
>>
>>               return group;
>>             }
>>           },
>>           Avros.specifics(MyAvroGroup.class));
>>
>> I think this is all pretty sane, but I'm getting an exception when the
>> pipeline attempts to run the by():
>>
>> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
>> org.apache.crunch.types.avro.AvroType
>>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>>     at
>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>>     at org.apache.crunch.impl.mem.collect.MemCollection.by
>> (MemCollection.java:222)
>>
>> Am I doing something obviously wrong?
>>
>> Thanks,
>> Natty
>>
>>
>>
>>
>> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>>
>>> To bring things full circle, the core issue I was having was caused by
>>> the fact that I was writing the data in the wrong way. Instead of
>>>
>>> pipeline.writeTextFile(words, args[1]);
>>>
>>> I should have been using
>>>
>>> pipeline.write(words, To.avroFile(args[1]);
>>>
>>> As Josh noted, writeTextFile was attempting to write my data out as a
>>> String, but I wasn't giving it an object that was easy to turn into a
>>> String, which resulted in an exception. Changing it to write to an avro
>>> file solved those issues.
>>>
>>> Thanks, Josh!
>>>
>>>
>>>
>>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>> Hey Natty,
>>>>
>>>> Reply inlined.
>>>>
>>>>
>>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>>
>>>>> Hey Josh,
>>>>>
>>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>>>> have. I can certainly go through the trouble of getting that file, but what
>>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>>> provided by the Avros utility class, it should be supported. So here's my
>>>>> question:
>>>>>
>>>>
>>>> Interesting-- I had not hit that use case for Avro before. For a POJO,
>>>> I would just use the reflection APIs, which are available via
>>>> Avros.reflects.
>>>>
>>>>
>>>>>
>>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>>> PType<T> has to implement Writable, and in the case of the return type of
>>>>> Avros.generics, this is not the case.
>>>>>
>>>>
>>>> There's no requirement for the PType<T> to be a Writable, or even an
>>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>>
>>>> I'm just taking a closer look at the Exception that was thrown, and it
>>>> looks to me like the problem is occurring at the end of the pipeline, where
>>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>>> posted). Crunch has to convert the PType to something that can be converted
>>>> to a Writable impl-- if you try to write an Avro object to the
>>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>>>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>>>> a Writable instance for writing to the TextOuputFormat.
>>>>
>>>>
>>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>>> necessary hoops exist.
>>>>>
>>>>
>>>> One way to fix this would be to update writeTextFile to force
>>>> conversion of any non-string that was passed into it into a String via an
>>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place.
>>>> What do you think?
>>>>
>>>>
>>>>> Thanks,
>>>>> Natty
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>
>>>>>> Did you look at Avros.specifics?
>>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>>> and then I modify my code to use GenericData.Records. Those Records still
>>>>>>> don't implement the Writable interface, so I'm still getting a class cast
>>>>>>> exception. Did I do something totally wrong?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <natty@cloudera.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Well, the problem with that is that I really want to work with my
>>>>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>>>>
>>>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>>>>> object.
>>>>>>>>>
>>>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>>>
>>>>>>>>> Josh
>>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>>>>>> something along these lines:
>>>>>>>>>>
>>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>>> schemaJsonString);
>>>>>>>>>>
>>>>>>>>>>     AvroType avroType = new
>>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>>         avroObjSchema, new
>>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>>
>>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>>> emitter) {
>>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>>       }
>>>>>>>>>>     }, avroType);
>>>>>>>>>>
>>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>>
>>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>>
>>>>>>>>>> Anybody have any thoughts? There's got to be a magical
>>>>>>>>>> incantation that I have slightly off.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>>
>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Josh Wills <jw...@cloudera.com>.
No, you're not-- I think that's a bug. Switching to "parallelDo" instead of
"by" on the groupedData object will work fine, but we should make sure the
by() operation works on grouped tables.


On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <na...@cloudera.com>wrote:

> Alright, I'm back for more. This time, I'm trying to perform a group by
> with Avro data. What I've currently got is this:
>
>     PGroupedTable<String, MyAvroObject> processedData =
> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
> emitter) {
>         String key = getKey(line);
>         MyAvroObject value = convertToAvroObject(line);
>         emitter.emit(Pair.of(key, value));
>       }
>     }, Avros.tableOf(Avros.strings(), Avros.specifics(MyAvroObject.class)))
>     .groupByKey(3);
>
>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData =
>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
> MyAvroGroup>() {
>             @Override
>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
> input) {
>               MyAvroGroup group = new MyAvroGroup();
>               group.objects = Lists.<MyAvroObject>newArrayList();
>
>               for (MyAvroObject obj : input.second()) {
>                 group.objects.add(obj);
>               }
>
>               return group;
>             }
>           },
>           Avros.specifics(MyAvroGroup.class));
>
> I think this is all pretty sane, but I'm getting an exception when the
> pipeline attempts to run the by():
>
> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
> org.apache.crunch.types.avro.AvroType
>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>     at
> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>     at org.apache.crunch.impl.mem.collect.MemCollection.by
> (MemCollection.java:222)
>
> Am I doing something obviously wrong?
>
> Thanks,
> Natty
>
>
>
>
> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>
>> To bring things full circle, the core issue I was having was caused by
>> the fact that I was writing the data in the wrong way. Instead of
>>
>> pipeline.writeTextFile(words, args[1]);
>>
>> I should have been using
>>
>> pipeline.write(words, To.avroFile(args[1]);
>>
>> As Josh noted, writeTextFile was attempting to write my data out as a
>> String, but I wasn't giving it an object that was easy to turn into a
>> String, which resulted in an exception. Changing it to write to an avro
>> file solved those issues.
>>
>> Thanks, Josh!
>>
>>
>>
>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> Hey Natty,
>>>
>>> Reply inlined.
>>>
>>>
>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>
>>>> Hey Josh,
>>>>
>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>>> have. I can certainly go through the trouble of getting that file, but what
>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>> provided by the Avros utility class, it should be supported. So here's my
>>>> question:
>>>>
>>>
>>> Interesting-- I had not hit that use case for Avro before. For a POJO, I
>>> would just use the reflection APIs, which are available via Avros.reflects.
>>>
>>>
>>>>
>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>> PType<T> has to implement Writable, and in the case of the return type of
>>>> Avros.generics, this is not the case.
>>>>
>>>
>>> There's no requirement for the PType<T> to be a Writable, or even an
>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>
>>> I'm just taking a closer look at the Exception that was thrown, and it
>>> looks to me like the problem is occurring at the end of the pipeline, where
>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>> posted). Crunch has to convert the PType to something that can be converted
>>> to a Writable impl-- if you try to write an Avro object to the
>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>>> a Writable instance for writing to the TextOuputFormat.
>>>
>>>
>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>> necessary hoops exist.
>>>>
>>>
>>> One way to fix this would be to update writeTextFile to force conversion
>>> of any non-string that was passed into it into a String via an auxiliary
>>> MapFn-- I'm not sure why I didn't do that in the first place. What do you
>>> think?
>>>
>>>
>>>> Thanks,
>>>> Natty
>>>>
>>>>
>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>
>>>>> Did you look at Avros.specifics?
>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>>>
>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>> and then I modify my code to use GenericData.Records. Those Records still
>>>>>> don't implement the Writable interface, so I'm still getting a class cast
>>>>>> exception. Did I do something totally wrong?
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>>>>
>>>>>>> Well, the problem with that is that I really want to work with my
>>>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>>>
>>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>>>> object.
>>>>>>>>
>>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>>
>>>>>>>> Josh
>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>>>>> something along these lines:
>>>>>>>>>
>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>> schemaJsonString);
>>>>>>>>>
>>>>>>>>>     AvroType avroType = new
>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>         avroObjSchema, new
>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>
>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>> emitter) {
>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>       }
>>>>>>>>>     }, avroType);
>>>>>>>>>
>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>
>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>
>>>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>>>> that I have slightly off.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
Alright, I'm back for more. This time, I'm trying to perform a group by
with Avro data. What I've currently got is this:

    PGroupedTable<String, MyAvroObject> processedData = data.parallelDo(new
DoFn<String, Pair<String, MyAvroObject>>() {
      public void process(String line, Emitter<Pair<String, MyAvroObject>>
emitter) {
        String key = getKey(line);
        MyAvroObject value = convertToAvroObject(line);
        emitter.emit(Pair.of(key, value));
      }
    }, Avros.tableOf(Avros.strings(), Avros.specifics(MyAvroObject.class)))
    .groupByKey(3);

    PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData =
        processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
MyAvroGroup>() {
            @Override
            public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
input) {
              MyAvroGroup group = new MyAvroGroup();
              group.objects = Lists.<MyAvroObject>newArrayList();

              for (MyAvroObject obj : input.second()) {
                group.objects.add(obj);
              }

              return group;
            }
          },
          Avros.specifics(MyAvroGroup.class));

I think this is all pretty sane, but I'm getting an exception when the
pipeline attempts to run the by():

12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
org.apache.crunch.types.avro.AvroType
    at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
    at
org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
    at org.apache.crunch.impl.mem.collect.MemCollection.by
(MemCollection.java:222)

Am I doing something obviously wrong?

Thanks,
Natty



On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <na...@cloudera.com>wrote:

> To bring things full circle, the core issue I was having was caused by the
> fact that I was writing the data in the wrong way. Instead of
>
> pipeline.writeTextFile(words, args[1]);
>
> I should have been using
>
> pipeline.write(words, To.avroFile(args[1]);
>
> As Josh noted, writeTextFile was attempting to write my data out as a
> String, but I wasn't giving it an object that was easy to turn into a
> String, which resulted in an exception. Changing it to write to an avro
> file solved those issues.
>
> Thanks, Josh!
>
>
>
> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jw...@cloudera.com> wrote:
>
>> Hey Natty,
>>
>> Reply inlined.
>>
>>
>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>>
>>> Hey Josh,
>>>
>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>> have. I can certainly go through the trouble of getting that file, but what
>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>> provided by the Avros utility class, it should be supported. So here's my
>>> question:
>>>
>>
>> Interesting-- I had not hit that use case for Avro before. For a POJO, I
>> would just use the reflection APIs, which are available via Avros.reflects.
>>
>>
>>>
>>> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
>>> has to implement Writable, and in the case of the return type of
>>> Avros.generics, this is not the case.
>>>
>>
>> There's no requirement for the PType<T> to be a Writable, or even an Avro
>> instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>> create PType<T> that depend on other PTypes, which is how Crunch handles
>> things like protocol buffers/thrift/jackson-style object serializations.
>>
>> I'm just taking a closer look at the Exception that was thrown, and it
>> looks to me like the problem is occurring at the end of the pipeline, where
>> you're calling pipeline.writeTextFile (not included in the code snippet
>> posted). Crunch has to convert the PType to something that can be converted
>> to a Writable impl-- if you try to write an Avro object to the
>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>> a Writable instance for writing to the TextOuputFormat.
>>
>>
>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>> necessary hoops exist.
>>>
>>
>> One way to fix this would be to update writeTextFile to force conversion
>> of any non-string that was passed into it into a String via an auxiliary
>> MapFn-- I'm not sure why I didn't do that in the first place. What do you
>> think?
>>
>>
>>> Thanks,
>>> Natty
>>>
>>>
>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com> wrote:
>>>
>>>> Did you look at Avros.specifics?
>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>>
>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>>>> then I modify my code to use GenericData.Records. Those Records still don't
>>>>> implement the Writable interface, so I'm still getting a class cast
>>>>> exception. Did I do something totally wrong?
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>>>
>>>>>> Well, the problem with that is that I really want to work with my
>>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>>
>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>>> object.
>>>>>>>
>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>
>>>>>>> Josh
>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>>>> something along these lines:
>>>>>>>>
>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>> schemaJsonString);
>>>>>>>>
>>>>>>>>     AvroType avroType = new
>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>         avroObjSchema, new
>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>
>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>> emitter) {
>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>       }
>>>>>>>>     }, avroType);
>>>>>>>>
>>>>>>>> However, this results in a class cast exception:
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>> com.company.MyAvroObject
>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>
>>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>>> that I have slightly off.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
To bring things full circle, the core issue I was having was caused by the
fact that I was writing the data in the wrong way. Instead of

pipeline.writeTextFile(words, args[1]);

I should have been using

pipeline.write(words, To.avroFile(args[1]);

As Josh noted, writeTextFile was attempting to write my data out as a
String, but I wasn't giving it an object that was easy to turn into a
String, which resulted in an exception. Changing it to write to an avro
file solved those issues.

Thanks, Josh!


On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Natty,
>
> Reply inlined.
>
>
> On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <na...@cloudera.com>wrote:
>
>> Hey Josh,
>>
>> That really doesn't solve the problem I'm facing. Avros.specifics assumes
>> that I've got a Java file that Avro generated for me, which I don't have. I
>> can certainly go through the trouble of getting that file, but what I've
>> got currently is a POJO that I'm associating with a JSON Avro schema. It's
>> a perfectly valid use case, and as far as I can tell, from what's provided
>> by the Avros utility class, it should be supported. So here's my question:
>>
>
> Interesting-- I had not hit that use case for Avro before. For a POJO, I
> would just use the reflection APIs, which are available via Avros.reflects.
>
>
>>
>> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
>> has to implement Writable, and in the case of the return type of
>> Avros.generics, this is not the case.
>>
>
> There's no requirement for the PType<T> to be a Writable, or even an Avro
> instance. There's stuff like o.a.c.types.PTypes.derived that lets you
> create PType<T> that depend on other PTypes, which is how Crunch handles
> things like protocol buffers/thrift/jackson-style object serializations.
>
> I'm just taking a closer look at the Exception that was thrown, and it
> looks to me like the problem is occurring at the end of the pipeline, where
> you're calling pipeline.writeTextFile (not included in the code snippet
> posted). Crunch has to convert the PType to something that can be converted
> to a Writable impl-- if you try to write an Avro object to the
> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to me
> that in this case, Crunch can't figure out how to turn MyAvroObject into a
> Writable instance for writing to the TextOuputFormat.
>
>
>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>> necessary hoops exist.
>>
>
> One way to fix this would be to update writeTextFile to force conversion
> of any non-string that was passed into it into a String via an auxiliary
> MapFn-- I'm not sure why I didn't do that in the first place. What do you
> think?
>
>
>> Thanks,
>> Natty
>>
>>
>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com> wrote:
>>
>>> Did you look at Avros.specifics?
>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>
>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>>> then I modify my code to use GenericData.Records. Those Records still don't
>>>> implement the Writable interface, so I'm still getting a class cast
>>>> exception. Did I do something totally wrong?
>>>>
>>>>
>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>>
>>>>> Well, the problem with that is that I really want to work with my
>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>>
>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>> object.
>>>>>>
>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>
>>>>>> Josh
>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> So I've been futzing with Crunch a bit, and trying to understand how
>>>>>>> to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>>> something along these lines:
>>>>>>>
>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>> schemaJsonString);
>>>>>>>
>>>>>>>     AvroType avroType = new
>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>         avroObjSchema, new
>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>
>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>> emitter) {
>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>       }
>>>>>>>     }, avroType);
>>>>>>>
>>>>>>> However, this results in a class cast exception:
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>> com.company.MyAvroObject
>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>     at
>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>     at
>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>     at
>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>
>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>> that I have slightly off.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Josh Wills <jw...@cloudera.com>.
Hey Natty,

Reply inlined.


On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <na...@cloudera.com>wrote:

> Hey Josh,
>
> That really doesn't solve the problem I'm facing. Avros.specifics assumes
> that I've got a Java file that Avro generated for me, which I don't have. I
> can certainly go through the trouble of getting that file, but what I've
> got currently is a POJO that I'm associating with a JSON Avro schema. It's
> a perfectly valid use case, and as far as I can tell, from what's provided
> by the Avros utility class, it should be supported. So here's my question:
>

Interesting-- I had not hit that use case for Avro before. For a POJO, I
would just use the reflection APIs, which are available via Avros.reflects.


>
> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
> has to implement Writable, and in the case of the return type of
> Avros.generics, this is not the case.
>

There's no requirement for the PType<T> to be a Writable, or even an Avro
instance. There's stuff like o.a.c.types.PTypes.derived that lets you
create PType<T> that depend on other PTypes, which is how Crunch handles
things like protocol buffers/thrift/jackson-style object serializations.

I'm just taking a closer look at the Exception that was thrown, and it
looks to me like the problem is occurring at the end of the pipeline, where
you're calling pipeline.writeTextFile (not included in the code snippet
posted). Crunch has to convert the PType to something that can be converted
to a Writable impl-- if you try to write an Avro object to the
TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to me
that in this case, Crunch can't figure out how to turn MyAvroObject into a
Writable instance for writing to the TextOuputFormat.


> If it's a bug, then fine, I'll file a JIRA and jump through whatever
> necessary hoops exist.
>

One way to fix this would be to update writeTextFile to force conversion of
any non-string that was passed into it into a String via an auxiliary
MapFn-- I'm not sure why I didn't do that in the first place. What do you
think?


> Thanks,
> Natty
>
>
> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com> wrote:
>
>> Did you look at Avros.specifics?
>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>
>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>> then I modify my code to use GenericData.Records. Those Records still don't
>>> implement the Writable interface, so I'm still getting a class cast
>>> exception. Did I do something totally wrong?
>>>
>>>
>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>>
>>>> Well, the problem with that is that I really want to work with my
>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>
>>>>
>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com>wrote:
>>>>
>>>>> You don't want to create an AvroType yourself, you want to call
>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>> object.
>>>>>
>>>>> Interesting though, I would still want that case to work correctly.
>>>>>
>>>>> Josh
>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>>>
>>>>>> So I've been futzing with Crunch a bit, and trying to understand how
>>>>>> to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>> something along these lines:
>>>>>>
>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>> schemaJsonString);
>>>>>>
>>>>>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>         avroObjSchema, new
>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>
>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>       public void process(String line, Emitter<MyAvroObject> emitter)
>>>>>> {
>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>       }
>>>>>>     }, avroType);
>>>>>>
>>>>>> However, this results in a class cast exception:
>>>>>>
>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>> com.company.MyAvroObject
>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>     at
>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>     at
>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>> (WritableTypeFamily.java:135)
>>>>>>     at
>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>
>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>> that I have slightly off.
>>>>>>
>>>>>
>>>>
>>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
Hey Josh,

That really doesn't solve the problem I'm facing. Avros.specifics assumes
that I've got a Java file that Avro generated for me, which I don't have. I
can certainly go through the trouble of getting that file, but what I've
got currently is a POJO that I'm associating with a JSON Avro schema. It's
a perfectly valid use case, and as far as I can tell, from what's provided
by the Avros utility class, it should be supported. So here's my question:

Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
has to implement Writable, and in the case of the return type of
Avros.generics, this is not the case.

If it's a bug, then fine, I'll file a JIRA and jump through whatever
necessary hoops exist.

Thanks,
Natty


On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <jo...@gmail.com> wrote:

> Did you look at Avros.specifics?
> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>
>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>> then I modify my code to use GenericData.Records. Those Records still don't
>> implement the Writable interface, so I'm still getting a class cast
>> exception. Did I do something totally wrong?
>>
>>
>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>>
>>> Well, the problem with that is that I really want to work with my
>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>
>>>
>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com> wrote:
>>>
>>>> You don't want to create an AvroType yourself, you want to call
>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>> object.
>>>>
>>>> Interesting though, I would still want that case to work correctly.
>>>>
>>>> Josh
>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>>
>>>>> So I've been futzing with Crunch a bit, and trying to understand how
>>>>> to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>> something along these lines:
>>>>>
>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>> schemaJsonString);
>>>>>
>>>>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>         avroObjSchema, new
>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>
>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
>>>>> MyAvroObject>() {
>>>>>       public void process(String line, Emitter<MyAvroObject> emitter) {
>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>       }
>>>>>     }, avroType);
>>>>>
>>>>> However, this results in a class cast exception:
>>>>>
>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>> com.company.MyAvroObject
>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>     at
>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>     at
>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>> (WritableTypeFamily.java:135)
>>>>>     at
>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>
>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>> that I have slightly off.
>>>>>
>>>>
>>>
>>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Josh Wills <jo...@gmail.com>.
Did you look at Avros.specifics?
On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:

> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
> then I modify my code to use GenericData.Records. Those Records still don't
> implement the Writable interface, so I'm still getting a class cast
> exception. Did I do something totally wrong?
>
>
> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com>wrote:
>
>> Well, the problem with that is that I really want to work with my
>> objects, rather than use Avros.generics, because then I'm forced to treat
>> everything as a GenericData.Record. It's just a pain in the butt.
>>
>>
>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com> wrote:
>>
>>> You don't want to create an AvroType yourself, you want to call
>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>> object.
>>>
>>> Interesting though, I would still want that case to work correctly.
>>>
>>> Josh
>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>>
>>>> So I've been futzing with Crunch a bit, and trying to understand how to
>>>> build a pipeline that outputs Avro data files. Roughly, I'm doing something
>>>> along these lines:
>>>>
>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>> schemaJsonString);
>>>>
>>>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>>>         avroObjSchema, new
>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>         MyAvroObject.class, avroObjSchema));
>>>>
>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
>>>> MyAvroObject>() {
>>>>       public void process(String line, Emitter<MyAvroObject> emitter) {
>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>       }
>>>>     }, avroType);
>>>>
>>>> However, this results in a class cast exception:
>>>>
>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>> com.company.MyAvroObject
>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>     at
>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>     at
>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>> (WritableTypeFamily.java:135)
>>>>     at
>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>
>>>> Anybody have any thoughts? There's got to be a magical incantation that
>>>> I have slightly off.
>>>>
>>>
>>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
Ok, I'm still a little confused. Let's say I use Avros.generics(), and then
I modify my code to use GenericData.Records. Those Records still don't
implement the Writable interface, so I'm still getting a class cast
exception. Did I do something totally wrong?


On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <na...@cloudera.com> wrote:

> Well, the problem with that is that I really want to work with my objects,
> rather than use Avros.generics, because then I'm forced to treat everything
> as a GenericData.Record. It's just a pain in the butt.
>
>
> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com> wrote:
>
>> You don't want to create an AvroType yourself, you want to call
>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>> object.
>>
>> Interesting though, I would still want that case to work correctly.
>>
>> Josh
>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>>
>>> So I've been futzing with Crunch a bit, and trying to understand how to
>>> build a pipeline that outputs Avro data files. Roughly, I'm doing something
>>> along these lines:
>>>
>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>     final Schema avroObjSchema = schemaParser.parse(
>>> schemaJsonString);
>>>
>>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>>         avroObjSchema, new
>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>         MyAvroObject.class, avroObjSchema));
>>>
>>>     PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
>>> MyAvroObject>() {
>>>       public void process(String line, Emitter<MyAvroObject> emitter) {
>>>         emitter.emit(convertStringToAvroObj(line));
>>>       }
>>>     }, avroType);
>>>
>>> However, this results in a class cast exception:
>>>
>>> Exception in thread "main" java.lang.ClassCastException: class
>>> com.company.MyAvroObject
>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>     at
>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>     at
>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>> (WritableTypeFamily.java:135)
>>>     at
>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>
>>> Anybody have any thoughts? There's got to be a magical incantation that
>>> I have slightly off.
>>>
>>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Jonathan Natkins <na...@cloudera.com>.
Well, the problem with that is that I really want to work with my objects,
rather than use Avros.generics, because then I'm forced to treat everything
as a GenericData.Record. It's just a pain in the butt.


On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <jo...@gmail.com> wrote:

> You don't want to create an AvroType yourself, you want to call
> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
> object.
>
> Interesting though, I would still want that case to work correctly.
>
> Josh
> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:
>
>> So I've been futzing with Crunch a bit, and trying to understand how to
>> build a pipeline that outputs Avro data files. Roughly, I'm doing something
>> along these lines:
>>
>>     Schema.Parser schemaParser = new Schema.Parser();
>>     final Schema avroObjSchema = schemaParser.parse(
>> schemaJsonString);
>>
>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>         avroObjSchema, new
>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>         MyAvroObject.class, avroObjSchema));
>>
>>     PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
>> MyAvroObject>() {
>>       public void process(String line, Emitter<MyAvroObject> emitter) {
>>         emitter.emit(convertStringToAvroObj(line));
>>       }
>>     }, avroType);
>>
>> However, this results in a class cast exception:
>>
>> Exception in thread "main" java.lang.ClassCastException: class
>> com.company.MyAvroObject
>>     at java.lang.Class.asSubclass(Class.java:3039)
>>     at
>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>     at
>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>> (WritableTypeFamily.java:135)
>>     at
>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>
>> Anybody have any thoughts? There's got to be a magical incantation that I
>> have slightly off.
>>
>

Re: Looking for some guidance in building a basic Avro pipeline

Posted by Josh Wills <jo...@gmail.com>.
You don't want to create an AvroType yourself, you want to call
o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
object.

Interesting though, I would still want that case to work correctly.

Josh
On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <na...@cloudera.com> wrote:

> So I've been futzing with Crunch a bit, and trying to understand how to
> build a pipeline that outputs Avro data files. Roughly, I'm doing something
> along these lines:
>
>     Schema.Parser schemaParser = new Schema.Parser();
>     final Schema avroObjSchema = schemaParser.parse(
> schemaJsonString);
>
>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>         avroObjSchema, new
> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>         MyAvroObject.class, avroObjSchema));
>
>     PCollection<MyAvroObject> words = logs.parallelDo(new DoFn<String,
> MyAvroObject>() {
>       public void process(String line, Emitter<MyAvroObject> emitter) {
>         emitter.emit(convertStringToAvroObj(line));
>       }
>     }, avroType);
>
> However, this results in a class cast exception:
>
> Exception in thread "main" java.lang.ClassCastException: class
> com.company.MyAvroObject
>     at java.lang.Class.asSubclass(Class.java:3039)
>     at
> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>     at
> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>     at org.apache.crunch.types.writable.WritableTypeFamily.as
> (WritableTypeFamily.java:135)
>     at
> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>
> Anybody have any thoughts? There's got to be a magical incantation that I
> have slightly off.
>