You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Terry Healy <th...@bnl.gov> on 2012/12/21 18:56:05 UTC

AVRO threatening to ruin Christmas

Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R
example, and outlined in Package org.apache.avro.mapred under "For jobs
whose input is an Avro data file and which use an AvroMapper, but whose
reducer is a non-Avro Reducer and whose output is a non-Avro format:".
Clearly I have misunderstood something while attempting to follow those
instructions.

The test code does not include a mapper so the job setup is not like
what I'm trying to achieve: AVRO format into Mapper, Text out of
Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
and GroupingComparator used in the working M/R code that reads .tsv
rather than AVRO.)

The current stumbling block is "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
class. I think my existing reducer would work fine if I could use it
with the AvroMapper it throws the above exception.


>From setup:

        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(LongPair.class);
        conf.setOutputValueClass(AvroFlowWritable.class);

        NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);

        AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
        AvroJob.setMapperClass(conf, AvroFlowMapper.class);
        AvroJob.setReducerClass(conf, AvroFlowReducer.class);

        Schema afwSchema =
ReflectData.get().getSchema(AvroFlowWritable.class);
        Schema pairSchema =
Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
        AvroJob.setMapOutputSchema(conf, pairSchema);


    /*
     * ------------------
     * *** Mapper ***
     * ------------------
     */

public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
AvroFlowWritable>> {

        Long[] keepIps;

	// Configure removed

        @Override
        public void map(K datum, AvroCollector<Pair<Long,
                AvroFlowWritable>> collector, Reporter reporter)
                throws IOException {


            GenericRecord record = (GenericRecord) datum;
            AvroFlowWritable afw = new AvroFlowWritable(record);

            if (isKeeper(afw)) {

                 Long testKey;
                if (inKeeperIpList(afw.getSrcIp())) {

                    testKey = new Long(afw.getDstIp());

                } else {

                    testKey = new Long(afw.getSrcIp());
                }
                collector.collect(new Pair<Long,
AvroFlowWritable>(testKey, afw));
            }
        }
    }

 /*
     * ------------------
     * *** Reducer ***
     * ------------------
     */

    public static class AvroFlowReducer extends AvroReducer<Long,
AvroFlowWritable, Text> {


        @Override
        public void reduce(Long key, Iterable<AvroFlowWritable> values,
AvroCollector<Text> collector, Reporter reporter) throws IOException {

            Iterator iter = values.iterator();
            while (iter.hasNext()) {

                AvroFlowWritable afw = (AvroFlowWritable) iter.next();
		//
                collector.collect(new Text(afw.toString()));
            }
        }
   }



On 12/20/2012 12:32 PM, Terry Healy wrote:
> I'm just getting started using AVRO within Map/Reduce and trying to
> convert some existing non-AVRO code to use AVRO input. So far the data
> that previously was stored in tab delimited files has been converted to
> .avro successfully as checked with avro-tools.
> 
> Where I'm getting hung up extending beyond my book-based examples is in
> attempting to read from AVRO (using generic records) where the mapper
> output is NOT in AVRO format. I can't seem to reconcile extending
> AvroMapper and NOT using AvroCollector.
> 
> Here are snippets of code that show my non-AVRO M/R code and my
> [failing] attempts to make this change. If anyone can help me along it
> would be very much appreciated.
> 
> -Terry
> 
> <code>
> Pre-Avro version: (Works fine with .tsv input format)
> 
>     public static class HdFlowMapper extends MapReduceBase
>             implements Mapper<Text, HdFlowWritable, LongPair,
> HdFlowWritable> {
> 
> 
>         @Override
>         public void map(Text key, HdFlowWritable value,
>                 OutputCollector<LongPair, HdFlowWritable> output,
>                 Reporter reporter) throws IOException {
> 
> 		...//
>                 outKey = new LongPair(value.getSrcIp(), value.getFirst());
> 
>                 HdFlowWritable outValue = value; // pass it all through
>                 output.collect(outKey, outValue);
> 	}
> 
> 

Re: AVRO threatening to ruin Christmas

Posted by Russell Jurney <ru...@gmail.com>.
Since the holidays are involved, I suggest you try Pig and the AvroStorage
UDF to load the data, and another UDF to store the data. What format are
you writing in?

Instructions for using avrostorage with Pig 0.10 are here:
http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/
 On Dec 21, 2012 9:56 AM, "Terry Healy" <th...@bnl.gov> wrote:

> Going crazy here trying to reconcile this. Found links to some aspects ,
> partially implemented in the 'The Definitive Guide' AVRO weather M/R
> example, and outlined in Package org.apache.avro.mapred under "For jobs
> whose input is an Avro data file and which use an AvroMapper, but whose
> reducer is a non-Avro Reducer and whose output is a non-Avro format:".
> Clearly I have misunderstood something while attempting to follow those
> instructions.
>
> The test code does not include a mapper so the job setup is not like
> what I'm trying to achieve: AVRO format into Mapper, Text out of
> Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
> and GroupingComparator used in the working M/R code that reads .tsv
> rather than AVRO.)
>
> The current stumbling block is "AvroFlowWritable cannot be cast to
> org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
> class. I think my existing reducer would work fine if I could use it
> with the AvroMapper it throws the above exception.
>
>
> From setup:
>
>         conf.setOutputFormat(TextOutputFormat.class);
>         conf.setOutputKeyClass(LongPair.class);
>         conf.setOutputValueClass(AvroFlowWritable.class);
>
>         NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);
>
>         AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
>         AvroJob.setMapperClass(conf, AvroFlowMapper.class);
>         AvroJob.setReducerClass(conf, AvroFlowReducer.class);
>
>         Schema afwSchema =
> ReflectData.get().getSchema(AvroFlowWritable.class);
>         Schema pairSchema =
> Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
>         AvroJob.setMapOutputSchema(conf, pairSchema);
>
>
>     /*
>      * ------------------
>      * *** Mapper ***
>      * ------------------
>      */
>
> public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
> AvroFlowWritable>> {
>
>         Long[] keepIps;
>
>         // Configure removed
>
>         @Override
>         public void map(K datum, AvroCollector<Pair<Long,
>                 AvroFlowWritable>> collector, Reporter reporter)
>                 throws IOException {
>
>
>             GenericRecord record = (GenericRecord) datum;
>             AvroFlowWritable afw = new AvroFlowWritable(record);
>
>             if (isKeeper(afw)) {
>
>                  Long testKey;
>                 if (inKeeperIpList(afw.getSrcIp())) {
>
>                     testKey = new Long(afw.getDstIp());
>
>                 } else {
>
>                     testKey = new Long(afw.getSrcIp());
>                 }
>                 collector.collect(new Pair<Long,
> AvroFlowWritable>(testKey, afw));
>             }
>         }
>     }
>
>  /*
>      * ------------------
>      * *** Reducer ***
>      * ------------------
>      */
>
>     public static class AvroFlowReducer extends AvroReducer<Long,
> AvroFlowWritable, Text> {
>
>
>         @Override
>         public void reduce(Long key, Iterable<AvroFlowWritable> values,
> AvroCollector<Text> collector, Reporter reporter) throws IOException {
>
>             Iterator iter = values.iterator();
>             while (iter.hasNext()) {
>
>                 AvroFlowWritable afw = (AvroFlowWritable) iter.next();
>                 //
>                 collector.collect(new Text(afw.toString()));
>             }
>         }
>    }
>
>
>
> On 12/20/2012 12:32 PM, Terry Healy wrote:
> > I'm just getting started using AVRO within Map/Reduce and trying to
> > convert some existing non-AVRO code to use AVRO input. So far the data
> > that previously was stored in tab delimited files has been converted to
> > .avro successfully as checked with avro-tools.
> >
> > Where I'm getting hung up extending beyond my book-based examples is in
> > attempting to read from AVRO (using generic records) where the mapper
> > output is NOT in AVRO format. I can't seem to reconcile extending
> > AvroMapper and NOT using AvroCollector.
> >
> > Here are snippets of code that show my non-AVRO M/R code and my
> > [failing] attempts to make this change. If anyone can help me along it
> > would be very much appreciated.
> >
> > -Terry
> >
> > <code>
> > Pre-Avro version: (Works fine with .tsv input format)
> >
> >     public static class HdFlowMapper extends MapReduceBase
> >             implements Mapper<Text, HdFlowWritable, LongPair,
> > HdFlowWritable> {
> >
> >
> >         @Override
> >         public void map(Text key, HdFlowWritable value,
> >                 OutputCollector<LongPair, HdFlowWritable> output,
> >                 Reporter reporter) throws IOException {
> >
> >               ...//
> >                 outKey = new LongPair(value.getSrcIp(),
> value.getFirst());
> >
> >                 HdFlowWritable outValue = value; // pass it all through
> >                 output.collect(outKey, outValue);
> >       }
> >
> >
>

Re: AVRO threatening to ruin Christmas

Posted by Terry Healy <th...@bnl.gov>.
Doug-

I have tried to use that Test as a reference, but being new to this I
can't reconcile the "missing" mapper or bridge the gap in my mind
between a SequenceFile input rather than an Avro file. So I still wind
up with the "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord" exception.

Have a good Christmas - maybe I'll get an Avro book.

-TH

On 12/21/2012 02:09 PM, Doug Cutting wrote:
> On Fri, Dec 21, 2012 at 9:56 AM, Terry Healy <th...@bnl.gov> wrote:
>> "For jobs
>> whose input is an Avro data file and which use an AvroMapper, but whose
>> reducer is a non-Avro Reducer and whose output is a non-Avro format:".
> [ ...]
>>     public static class AvroFlowReducer extends AvroReducer
> 
> It looks to me like your reducer is an Avro reducer when in this case
> you should instead subclass org.apache.hadoop,mapred.Reducer.
> 
> An example of this is in TestSequenceFileReader#testNonAvroReducer:
> 
> http://s.apache.org/testnonavroreducer
> 
> I hope this helps.
> 
> Doug
> 

Re: AVRO threatening to ruin Christmas

Posted by Doug Cutting <cu...@apache.org>.
On Fri, Dec 21, 2012 at 9:56 AM, Terry Healy <th...@bnl.gov> wrote:
> "For jobs
> whose input is an Avro data file and which use an AvroMapper, but whose
> reducer is a non-Avro Reducer and whose output is a non-Avro format:".
[ ...]
>     public static class AvroFlowReducer extends AvroReducer

It looks to me like your reducer is an Avro reducer when in this case
you should instead subclass org.apache.hadoop,mapred.Reducer.

An example of this is in TestSequenceFileReader#testNonAvroReducer:

http://s.apache.org/testnonavroreducer

I hope this helps.

Doug