You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Harsh J <ha...@cloudera.com> on 2013/08/01 18:35:43 UTC

Re: Mapper not called

I've often found the issue behind such an observance to be that the
input files lack an .avro extension. Is that true in your case? Can
you retry after a rename if yes?

On Wed, Jul 31, 2013 at 1:02 AM, Anna Lahoud <an...@gmail.com> wrote:
> I am following directions on
> http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html
> to write a job that takes Avro files as input and outputs non-Avro files, I
> created the following job. I should note that I have tried different
> variations of ordering the setInput/OutputPath lines, the AvroJob lines, and
> the reduce task settings. It always results the same: the job runs with 0
> mappers and 1 reducer (which gets no data so is essentially an emtpy
> SequenceFile). It always says there are 10 input files so that's not the
> issue. There is an @Override statement on my map and my reduce so that's not
> the issue. And I believe I have correctly followed the Avro input/non-Avro
> output instructions mentioned in the link above. Any other ideas would be
> welcome!!!
>
>
> public class MyAvroJob extends Configured implements Tool {
>
> @Override
> public int run(String[] args) throws Exception {
>
> JobConf job = new JobConf(getConf(), this.getClass());
>
> FileInputFormat.setInputPaths(job, new Path(args[0]));
> FileOutputFormat.setOutputPath(job, new Path(args[1]));
>
> AvroJob.setMapperClass(job, MyAvroMapper.class);
> AvroJob.setInputSchema(job, MySchema.SCHEMA$);
> AvroJob.setMapOutputSchema(job,
> Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING)));
>
> job.setReducerClass(MyNonAvroReducer.class);
> job.setOutputFormat(SequenceFileOutputFormat.class);
> job.setOutputKeyClass(Text.class);
> job.setOutputValueClass(Text.class);
> job.setNumReduceTasks(1);
>
> return JobClient.runJob(job).isSuccessful();
> }
>
> public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String,
> String>> {
>
> @Override
> public void map(MySchema in, AvroCollector<Pair<String, String>> collector,
> Reporter reporter) throws IOException {
>
> List<MyThings> things = in.getRecords();
> ...
> collector.collect(new Pair<String, String>( newKey, newValue));
> }
> }
>
> public static class MyNonAvroReducer extends MapReduceBase implements
> Reducer<AvroKey<String>, AvroValue<String>, Text, Text> {
>
> @Override
> public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values,
> OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
> while (values.hasNext()) {
>   output.collect(new Text(key.datum()), new Text(values.next().datum()));
> }
> }
> }
>
> public static void main(String[] args) throws Exception {
> ToolRunner.run(new MyAvroJob(), args);
>
> }
>
>
>
>
> -Anna
>
>
>
>
>



-- 
Harsh J