You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by DSuiter RDX <ds...@rdx.com> on 2013/10/23 20:59:38 UTC

Avro Input > Text output MapReduce new Avro API

Hello,

We are trying to take an Avro input, send it through mapreduce, and get a
text output. We are trying to stay consistent with the
org.apache.avro.*mapreduce
*API, and are having a lot of trouble understanding how to extract the
wrapped Avro input and map it out as text. Here is our driver and mapper,
and a log of the error. Would anyone be able to provide some direction? I
have seen the examples from the Avro project pages, and the popularly
referenced Git examples, but they all are written using org.apache.avro.*
mapred* API. There is one example from Chris White on Gist for Avro
*output* but
nothing for input. It seems like even that one uses the
org.apache.avro.mapred.AvroKey, which you will see our code uses as well. I
have seen mention of using the org.apache.avro.mapreduce.* input objects
and output objects as wrappers for mapreduce primitives, but cannot find
examples anywhere, and there are no examples or instructions on the apache
pages as there are for org.apache.avro.mapred.

Driver:
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RTTicketCount extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
      System.err
.printf("Usage: RTTicketCount <input dir> <output dir>\n");
return -1;
}

    // String input = args[0];
    // String output = args[1];

Job job = new Job(getConf());
job.setJobName("RTTicketCount");
job.setJarByClass(RTTicketCount.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(TicketMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
    job.setCombinerClass(SumReducer.class);
job.setReducerClass(SumReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
 AvroJob.setInputKeySchema(job, Event.getClassSchema());
job.setInputFormatClass(AvroKeyInputFormat.class);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new RTTicketCount(),
args);
System.exit(exitCode);
}
}

Mapper:
import java.io.IOException;
import org.apache.avro.mapred.AvroKey;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TicketMapper extends
Mapper<AvroKey<Event>, NullWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
  protected void map(AvroKey<Event> key, Iterable<NullWritable> values,
      Context context) throws IOException, InterruptedException {
    String check = key.datum().getBody().toString();
    if (check.contains("Ticket")) {
      context.write(new Text(check), one);
    }
  }

Log output of exception:

13/10/23 13:57:11 INFO mapred.JobClient: Task Id :
attempt_201310160855_0007_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.Text, recieved org.apache.avro.mapred.AvroKey
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:890)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:601)
at
org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at
org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:120)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred


Thank You,
*Devin Suiter*
Jr. Data Solutions Software Engineer
100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
Google Voice: 412-256-8556 | www.rdx.com

Re: Avro Input > Text output MapReduce new Avro API

Posted by Marshall Bockrath-Vandegrift <ll...@gmail.com>.
DSuiter RDX <ds...@rdx.com> writes:

> protected void map(AvroKey<Event> key, Iterable<NullWritable> values,
> Context context) throws IOException, InterruptedException {

It looks like your `map()` method doesn’t have the right type signature,
seemingly following what’s necessary for `Reducer#reduce()` instead.
Without an `@Override` annotation, I believe this is silently defining a
separate `map()` method with a different type signature, leaving the
default pass-through `Mapper#map()` as-in.

-Marshall