You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by John Pauley <Jo...@threattrack.com> on 2014/02/28 16:18:31 UTC
[hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
Which version of hadoop are you using?
There's a possibility that the hadoop environment already have a avro**.jar
in place, thus caused the jar conflict.
Regards,
*Stanley Shi,*
On Tue, Mar 4, 2014 at 11:25 PM, John Pauley <Jo...@threattrack.com>wrote:
> Outside hadoop: avro-1.7.6
> Inside hadoop: avro-mapred-1.7.6-hadoop2
>
>
> From: Stanley Shi <ss...@gopivotal.com>
> Reply-To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Date: Monday, March 3, 2014 at 8:30 PM
> To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Subject: Re: [hadoop] AvroMultipleOutputs
> org.apache.avro.file.DataFileWriter$AppendWriteException
>
> which avro version are you using when running outside of hadoop?
>
> Regards,
> *Stanley Shi,*
>
>
>
> On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
>
>> This is cross posted to avro-user list (
>> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
>> ).
>>
>> Hello all,
>>
>> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
>> issue occurs when using a schema that has a union of null and a fixed
>> (among other complex types), default to null, and it is not null.
>> Please find the full stack trace below and a sample map/reduce job that
>> generates an Avro container file and uses that for the m/r input. Note
>> that I can serialize/deserialize without issue using
>> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
>> be helpful.
>>
>> Stack trace:
>> java.lang.Exception:
>> org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema
>> in union null of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
>> ... 16 more
>> Caused by: java.lang.NullPointerException
>> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
>> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
>> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
>> at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>>
>> Sample m/r job:
>> <mr_job>
>> package com.tts.ox.mapreduce.example.avro;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileWriter;
>> import org.apache.avro.generic.GenericData;
>> import org.apache.avro.generic.GenericDatumWriter;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.generic.GenericRecordBuilder;
>> import org.apache.avro.io.DatumWriter;
>> import org.apache.avro.mapred.AvroKey;
>> import org.apache.avro.mapreduce.AvroJob;
>> import org.apache.avro.mapreduce.AvroKeyInputFormat;
>> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
>> import org.apache.avro.mapreduce.AvroMultipleOutputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.conf.Configured;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.NullWritable;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.hadoop.mapreduce.Mapper;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> import org.apache.hadoop.util.GenericOptionsParser;
>> import org.apache.hadoop.util.Tool;
>> import org.apache.hadoop.util.ToolRunner;
>>
>> import java.io.File;
>> import java.io.IOException;
>>
>> public class AvroContainerFileDriver extends Configured implements Tool
>> {
>> //
>> // define a schema with a union of null and fixed
>> private static final String SCHEMA = "{\n" +
>> " \"namespace\": \"com.foo.bar\",\n" +
>> " \"name\": \"simple_schema\",\n" +
>> " \"type\": \"record\",\n" +
>> " \"fields\": [{\n" +
>> " \"name\": \"foo\",\n" +
>> " \"type\": {\n" +
>> " \"name\": \"bar\",\n" +
>> " \"type\": \"fixed\",\n" +
>> " \"size\": 2\n" +
>> " }\n" +
>> " }, {\n" +
>> " \"name\": \"baz\",\n" +
>> " \"type\": [\"null\", \"bar\"],\n" +
>> " \"default\": null\n" +
>> " }]\n" +
>> "}";
>>
>> public static class SampleMapper extends
>> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
>> private AvroMultipleOutputs amos;
>>
>> @Override
>> protected void setup(Context context) {
>> amos = new AvroMultipleOutputs(context);
>> }
>>
>> @Override
>> protected void cleanup(Context context) throws IOException,
>> InterruptedException {
>> amos.close();
>> }
>>
>> @Override
>> protected void map(AvroKey<GenericRecord> record, NullWritable
>> ignore, Context context)
>> throws IOException, InterruptedException {
>> // simply write the record to a container using AvroMultipleOutputs
>> amos.write("avro", new
>> AvroKey<GenericRecord>(record.datum()), NullWritable.get());
>> }
>> }
>>
>> @Override
>> public int run(final String[] args) throws Exception {
>> Schema.Parser parser = new Schema.Parser();
>> Schema schema = parser.parse(SCHEMA);
>>
>> //
>> // generate avro container file for input to mapper
>> byte[] dummy = {(byte) 0x01, (byte) 0x02};
>> GenericData.Fixed foo = new
>> GenericData.Fixed(schema.getField("foo").schema(), dummy);
>> GenericData.Fixed baz = new
>> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>>
>> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
>> .set(schema.getField("foo"), foo);
>> GenericRecord record0 = builder.build(); // baz is null
>>
>> builder.set(schema.getField("baz"), baz);
>> GenericRecord record1 = builder.build(); // baz is not null, bad
>> news
>>
>> File file = new File("/tmp/avrotest/input/test.avro");
>> DatumWriter<GenericRecord> datumWriter = new
>> GenericDatumWriter<GenericRecord>(schema);
>> DataFileWriter<GenericRecord> dataFileWriter = new
>> DataFileWriter<GenericRecord>(datumWriter);
>> dataFileWriter.create(schema, file);
>> dataFileWriter.append(record0);
>> //
>> // HELP: job succeeds when we do not have record with non-null
>> baz, comment out to succeed
>> //
>> dataFileWriter.append(record1);
>> dataFileWriter.close();
>>
>> //
>> // configure and run job
>> Configuration configuration = new Configuration();
>> String[] otherArgs = new GenericOptionsParser(configuration,
>> args).getRemainingArgs();
>> Job job = Job.getInstance(configuration, "Sample Avro Map
>> Reduce");
>>
>> job.setInputFormatClass(AvroKeyInputFormat.class);
>> AvroJob.setInputKeySchema(job, schema);
>>
>> job.setMapperClass(SampleMapper.class);
>> job.setNumReduceTasks(0);
>>
>> AvroJob.setOutputKeySchema(job, schema);
>> AvroMultipleOutputs.addNamedOutput(job, "avro",
>> AvroKeyOutputFormat.class, schema);
>>
>> FileInputFormat.addInputPath(job, new
>> Path(("/tmp/avrotest/input")));
>> FileOutputFormat.setOutputPath(job, new
>> Path("/tmp/avrotest/output"));
>>
>> return (job.waitForCompletion(true) ? 0 : 1);
>> }
>>
>> public static void main(String[] args) throws Exception {
>> int exitCode = ToolRunner.run(new AvroContainerFileDriver(),
>> args);
>> System.exit(exitCode);
>> }
>> }
>> </mr_job>
>>
>> Thanks,
>> John Pauley
>> Sr. Software Engineer
>> ThreatTrack Security
>>
>>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
Which version of hadoop are you using?
There's a possibility that the hadoop environment already have a avro**.jar
in place, thus caused the jar conflict.
Regards,
*Stanley Shi,*
On Tue, Mar 4, 2014 at 11:25 PM, John Pauley <Jo...@threattrack.com>wrote:
> Outside hadoop: avro-1.7.6
> Inside hadoop: avro-mapred-1.7.6-hadoop2
>
>
> From: Stanley Shi <ss...@gopivotal.com>
> Reply-To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Date: Monday, March 3, 2014 at 8:30 PM
> To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Subject: Re: [hadoop] AvroMultipleOutputs
> org.apache.avro.file.DataFileWriter$AppendWriteException
>
> which avro version are you using when running outside of hadoop?
>
> Regards,
> *Stanley Shi,*
>
>
>
> On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
>
>> This is cross posted to avro-user list (
>> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
>> ).
>>
>> Hello all,
>>
>> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
>> issue occurs when using a schema that has a union of null and a fixed
>> (among other complex types), default to null, and it is not null.
>> Please find the full stack trace below and a sample map/reduce job that
>> generates an Avro container file and uses that for the m/r input. Note
>> that I can serialize/deserialize without issue using
>> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
>> be helpful.
>>
>> Stack trace:
>> java.lang.Exception:
>> org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema
>> in union null of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
>> ... 16 more
>> Caused by: java.lang.NullPointerException
>> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
>> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
>> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
>> at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>>
>> Sample m/r job:
>> <mr_job>
>> package com.tts.ox.mapreduce.example.avro;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileWriter;
>> import org.apache.avro.generic.GenericData;
>> import org.apache.avro.generic.GenericDatumWriter;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.generic.GenericRecordBuilder;
>> import org.apache.avro.io.DatumWriter;
>> import org.apache.avro.mapred.AvroKey;
>> import org.apache.avro.mapreduce.AvroJob;
>> import org.apache.avro.mapreduce.AvroKeyInputFormat;
>> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
>> import org.apache.avro.mapreduce.AvroMultipleOutputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.conf.Configured;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.NullWritable;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.hadoop.mapreduce.Mapper;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> import org.apache.hadoop.util.GenericOptionsParser;
>> import org.apache.hadoop.util.Tool;
>> import org.apache.hadoop.util.ToolRunner;
>>
>> import java.io.File;
>> import java.io.IOException;
>>
>> public class AvroContainerFileDriver extends Configured implements Tool
>> {
>> //
>> // define a schema with a union of null and fixed
>> private static final String SCHEMA = "{\n" +
>> " \"namespace\": \"com.foo.bar\",\n" +
>> " \"name\": \"simple_schema\",\n" +
>> " \"type\": \"record\",\n" +
>> " \"fields\": [{\n" +
>> " \"name\": \"foo\",\n" +
>> " \"type\": {\n" +
>> " \"name\": \"bar\",\n" +
>> " \"type\": \"fixed\",\n" +
>> " \"size\": 2\n" +
>> " }\n" +
>> " }, {\n" +
>> " \"name\": \"baz\",\n" +
>> " \"type\": [\"null\", \"bar\"],\n" +
>> " \"default\": null\n" +
>> " }]\n" +
>> "}";
>>
>> public static class SampleMapper extends
>> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
>> private AvroMultipleOutputs amos;
>>
>> @Override
>> protected void setup(Context context) {
>> amos = new AvroMultipleOutputs(context);
>> }
>>
>> @Override
>> protected void cleanup(Context context) throws IOException,
>> InterruptedException {
>> amos.close();
>> }
>>
>> @Override
>> protected void map(AvroKey<GenericRecord> record, NullWritable
>> ignore, Context context)
>> throws IOException, InterruptedException {
>> // simply write the record to a container using AvroMultipleOutputs
>> amos.write("avro", new
>> AvroKey<GenericRecord>(record.datum()), NullWritable.get());
>> }
>> }
>>
>> @Override
>> public int run(final String[] args) throws Exception {
>> Schema.Parser parser = new Schema.Parser();
>> Schema schema = parser.parse(SCHEMA);
>>
>> //
>> // generate avro container file for input to mapper
>> byte[] dummy = {(byte) 0x01, (byte) 0x02};
>> GenericData.Fixed foo = new
>> GenericData.Fixed(schema.getField("foo").schema(), dummy);
>> GenericData.Fixed baz = new
>> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>>
>> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
>> .set(schema.getField("foo"), foo);
>> GenericRecord record0 = builder.build(); // baz is null
>>
>> builder.set(schema.getField("baz"), baz);
>> GenericRecord record1 = builder.build(); // baz is not null, bad
>> news
>>
>> File file = new File("/tmp/avrotest/input/test.avro");
>> DatumWriter<GenericRecord> datumWriter = new
>> GenericDatumWriter<GenericRecord>(schema);
>> DataFileWriter<GenericRecord> dataFileWriter = new
>> DataFileWriter<GenericRecord>(datumWriter);
>> dataFileWriter.create(schema, file);
>> dataFileWriter.append(record0);
>> //
>> // HELP: job succeeds when we do not have record with non-null
>> baz, comment out to succeed
>> //
>> dataFileWriter.append(record1);
>> dataFileWriter.close();
>>
>> //
>> // configure and run job
>> Configuration configuration = new Configuration();
>> String[] otherArgs = new GenericOptionsParser(configuration,
>> args).getRemainingArgs();
>> Job job = Job.getInstance(configuration, "Sample Avro Map
>> Reduce");
>>
>> job.setInputFormatClass(AvroKeyInputFormat.class);
>> AvroJob.setInputKeySchema(job, schema);
>>
>> job.setMapperClass(SampleMapper.class);
>> job.setNumReduceTasks(0);
>>
>> AvroJob.setOutputKeySchema(job, schema);
>> AvroMultipleOutputs.addNamedOutput(job, "avro",
>> AvroKeyOutputFormat.class, schema);
>>
>> FileInputFormat.addInputPath(job, new
>> Path(("/tmp/avrotest/input")));
>> FileOutputFormat.setOutputPath(job, new
>> Path("/tmp/avrotest/output"));
>>
>> return (job.waitForCompletion(true) ? 0 : 1);
>> }
>>
>> public static void main(String[] args) throws Exception {
>> int exitCode = ToolRunner.run(new AvroContainerFileDriver(),
>> args);
>> System.exit(exitCode);
>> }
>> }
>> </mr_job>
>>
>> Thanks,
>> John Pauley
>> Sr. Software Engineer
>> ThreatTrack Security
>>
>>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
Which version of hadoop are you using?
There's a possibility that the hadoop environment already have a avro**.jar
in place, thus caused the jar conflict.
Regards,
*Stanley Shi,*
On Tue, Mar 4, 2014 at 11:25 PM, John Pauley <Jo...@threattrack.com>wrote:
> Outside hadoop: avro-1.7.6
> Inside hadoop: avro-mapred-1.7.6-hadoop2
>
>
> From: Stanley Shi <ss...@gopivotal.com>
> Reply-To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Date: Monday, March 3, 2014 at 8:30 PM
> To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Subject: Re: [hadoop] AvroMultipleOutputs
> org.apache.avro.file.DataFileWriter$AppendWriteException
>
> which avro version are you using when running outside of hadoop?
>
> Regards,
> *Stanley Shi,*
>
>
>
> On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
>
>> This is cross posted to avro-user list (
>> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
>> ).
>>
>> Hello all,
>>
>> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
>> issue occurs when using a schema that has a union of null and a fixed
>> (among other complex types), default to null, and it is not null.
>> Please find the full stack trace below and a sample map/reduce job that
>> generates an Avro container file and uses that for the m/r input. Note
>> that I can serialize/deserialize without issue using
>> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
>> be helpful.
>>
>> Stack trace:
>> java.lang.Exception:
>> org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema
>> in union null of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
>> ... 16 more
>> Caused by: java.lang.NullPointerException
>> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
>> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
>> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
>> at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>>
>> Sample m/r job:
>> <mr_job>
>> package com.tts.ox.mapreduce.example.avro;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileWriter;
>> import org.apache.avro.generic.GenericData;
>> import org.apache.avro.generic.GenericDatumWriter;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.generic.GenericRecordBuilder;
>> import org.apache.avro.io.DatumWriter;
>> import org.apache.avro.mapred.AvroKey;
>> import org.apache.avro.mapreduce.AvroJob;
>> import org.apache.avro.mapreduce.AvroKeyInputFormat;
>> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
>> import org.apache.avro.mapreduce.AvroMultipleOutputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.conf.Configured;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.NullWritable;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.hadoop.mapreduce.Mapper;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> import org.apache.hadoop.util.GenericOptionsParser;
>> import org.apache.hadoop.util.Tool;
>> import org.apache.hadoop.util.ToolRunner;
>>
>> import java.io.File;
>> import java.io.IOException;
>>
>> public class AvroContainerFileDriver extends Configured implements Tool
>> {
>> //
>> // define a schema with a union of null and fixed
>> private static final String SCHEMA = "{\n" +
>> " \"namespace\": \"com.foo.bar\",\n" +
>> " \"name\": \"simple_schema\",\n" +
>> " \"type\": \"record\",\n" +
>> " \"fields\": [{\n" +
>> " \"name\": \"foo\",\n" +
>> " \"type\": {\n" +
>> " \"name\": \"bar\",\n" +
>> " \"type\": \"fixed\",\n" +
>> " \"size\": 2\n" +
>> " }\n" +
>> " }, {\n" +
>> " \"name\": \"baz\",\n" +
>> " \"type\": [\"null\", \"bar\"],\n" +
>> " \"default\": null\n" +
>> " }]\n" +
>> "}";
>>
>> public static class SampleMapper extends
>> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
>> private AvroMultipleOutputs amos;
>>
>> @Override
>> protected void setup(Context context) {
>> amos = new AvroMultipleOutputs(context);
>> }
>>
>> @Override
>> protected void cleanup(Context context) throws IOException,
>> InterruptedException {
>> amos.close();
>> }
>>
>> @Override
>> protected void map(AvroKey<GenericRecord> record, NullWritable
>> ignore, Context context)
>> throws IOException, InterruptedException {
>> // simply write the record to a container using AvroMultipleOutputs
>> amos.write("avro", new
>> AvroKey<GenericRecord>(record.datum()), NullWritable.get());
>> }
>> }
>>
>> @Override
>> public int run(final String[] args) throws Exception {
>> Schema.Parser parser = new Schema.Parser();
>> Schema schema = parser.parse(SCHEMA);
>>
>> //
>> // generate avro container file for input to mapper
>> byte[] dummy = {(byte) 0x01, (byte) 0x02};
>> GenericData.Fixed foo = new
>> GenericData.Fixed(schema.getField("foo").schema(), dummy);
>> GenericData.Fixed baz = new
>> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>>
>> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
>> .set(schema.getField("foo"), foo);
>> GenericRecord record0 = builder.build(); // baz is null
>>
>> builder.set(schema.getField("baz"), baz);
>> GenericRecord record1 = builder.build(); // baz is not null, bad
>> news
>>
>> File file = new File("/tmp/avrotest/input/test.avro");
>> DatumWriter<GenericRecord> datumWriter = new
>> GenericDatumWriter<GenericRecord>(schema);
>> DataFileWriter<GenericRecord> dataFileWriter = new
>> DataFileWriter<GenericRecord>(datumWriter);
>> dataFileWriter.create(schema, file);
>> dataFileWriter.append(record0);
>> //
>> // HELP: job succeeds when we do not have record with non-null
>> baz, comment out to succeed
>> //
>> dataFileWriter.append(record1);
>> dataFileWriter.close();
>>
>> //
>> // configure and run job
>> Configuration configuration = new Configuration();
>> String[] otherArgs = new GenericOptionsParser(configuration,
>> args).getRemainingArgs();
>> Job job = Job.getInstance(configuration, "Sample Avro Map
>> Reduce");
>>
>> job.setInputFormatClass(AvroKeyInputFormat.class);
>> AvroJob.setInputKeySchema(job, schema);
>>
>> job.setMapperClass(SampleMapper.class);
>> job.setNumReduceTasks(0);
>>
>> AvroJob.setOutputKeySchema(job, schema);
>> AvroMultipleOutputs.addNamedOutput(job, "avro",
>> AvroKeyOutputFormat.class, schema);
>>
>> FileInputFormat.addInputPath(job, new
>> Path(("/tmp/avrotest/input")));
>> FileOutputFormat.setOutputPath(job, new
>> Path("/tmp/avrotest/output"));
>>
>> return (job.waitForCompletion(true) ? 0 : 1);
>> }
>>
>> public static void main(String[] args) throws Exception {
>> int exitCode = ToolRunner.run(new AvroContainerFileDriver(),
>> args);
>> System.exit(exitCode);
>> }
>> }
>> </mr_job>
>>
>> Thanks,
>> John Pauley
>> Sr. Software Engineer
>> ThreatTrack Security
>>
>>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
Which version of hadoop are you using?
There's a possibility that the hadoop environment already have a avro**.jar
in place, thus caused the jar conflict.
Regards,
*Stanley Shi,*
On Tue, Mar 4, 2014 at 11:25 PM, John Pauley <Jo...@threattrack.com>wrote:
> Outside hadoop: avro-1.7.6
> Inside hadoop: avro-mapred-1.7.6-hadoop2
>
>
> From: Stanley Shi <ss...@gopivotal.com>
> Reply-To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Date: Monday, March 3, 2014 at 8:30 PM
> To: "user@hadoop.apache.org" <us...@hadoop.apache.org>
> Subject: Re: [hadoop] AvroMultipleOutputs
> org.apache.avro.file.DataFileWriter$AppendWriteException
>
> which avro version are you using when running outside of hadoop?
>
> Regards,
> *Stanley Shi,*
>
>
>
> On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
>
>> This is cross posted to avro-user list (
>> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
>> ).
>>
>> Hello all,
>>
>> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
>> issue occurs when using a schema that has a union of null and a fixed
>> (among other complex types), default to null, and it is not null.
>> Please find the full stack trace below and a sample map/reduce job that
>> generates an Avro container file and uses that for the m/r input. Note
>> that I can serialize/deserialize without issue using
>> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
>> be helpful.
>>
>> Stack trace:
>> java.lang.Exception:
>> org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
>> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
>> of union in field baz of com.foo.bar.simple_schema
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
>> at
>> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
>> at
>> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
>> at
>> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema
>> in union null of union in field baz of com.foo.bar.simple_schema
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
>> ... 16 more
>> Caused by: java.lang.NullPointerException
>> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
>> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
>> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
>> at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>>
>> Sample m/r job:
>> <mr_job>
>> package com.tts.ox.mapreduce.example.avro;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileWriter;
>> import org.apache.avro.generic.GenericData;
>> import org.apache.avro.generic.GenericDatumWriter;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.generic.GenericRecordBuilder;
>> import org.apache.avro.io.DatumWriter;
>> import org.apache.avro.mapred.AvroKey;
>> import org.apache.avro.mapreduce.AvroJob;
>> import org.apache.avro.mapreduce.AvroKeyInputFormat;
>> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
>> import org.apache.avro.mapreduce.AvroMultipleOutputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.conf.Configured;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.NullWritable;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.hadoop.mapreduce.Mapper;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> import org.apache.hadoop.util.GenericOptionsParser;
>> import org.apache.hadoop.util.Tool;
>> import org.apache.hadoop.util.ToolRunner;
>>
>> import java.io.File;
>> import java.io.IOException;
>>
>> public class AvroContainerFileDriver extends Configured implements Tool
>> {
>> //
>> // define a schema with a union of null and fixed
>> private static final String SCHEMA = "{\n" +
>> " \"namespace\": \"com.foo.bar\",\n" +
>> " \"name\": \"simple_schema\",\n" +
>> " \"type\": \"record\",\n" +
>> " \"fields\": [{\n" +
>> " \"name\": \"foo\",\n" +
>> " \"type\": {\n" +
>> " \"name\": \"bar\",\n" +
>> " \"type\": \"fixed\",\n" +
>> " \"size\": 2\n" +
>> " }\n" +
>> " }, {\n" +
>> " \"name\": \"baz\",\n" +
>> " \"type\": [\"null\", \"bar\"],\n" +
>> " \"default\": null\n" +
>> " }]\n" +
>> "}";
>>
>> public static class SampleMapper extends
>> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
>> private AvroMultipleOutputs amos;
>>
>> @Override
>> protected void setup(Context context) {
>> amos = new AvroMultipleOutputs(context);
>> }
>>
>> @Override
>> protected void cleanup(Context context) throws IOException,
>> InterruptedException {
>> amos.close();
>> }
>>
>> @Override
>> protected void map(AvroKey<GenericRecord> record, NullWritable
>> ignore, Context context)
>> throws IOException, InterruptedException {
>> // simply write the record to a container using AvroMultipleOutputs
>> amos.write("avro", new
>> AvroKey<GenericRecord>(record.datum()), NullWritable.get());
>> }
>> }
>>
>> @Override
>> public int run(final String[] args) throws Exception {
>> Schema.Parser parser = new Schema.Parser();
>> Schema schema = parser.parse(SCHEMA);
>>
>> //
>> // generate avro container file for input to mapper
>> byte[] dummy = {(byte) 0x01, (byte) 0x02};
>> GenericData.Fixed foo = new
>> GenericData.Fixed(schema.getField("foo").schema(), dummy);
>> GenericData.Fixed baz = new
>> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>>
>> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
>> .set(schema.getField("foo"), foo);
>> GenericRecord record0 = builder.build(); // baz is null
>>
>> builder.set(schema.getField("baz"), baz);
>> GenericRecord record1 = builder.build(); // baz is not null, bad
>> news
>>
>> File file = new File("/tmp/avrotest/input/test.avro");
>> DatumWriter<GenericRecord> datumWriter = new
>> GenericDatumWriter<GenericRecord>(schema);
>> DataFileWriter<GenericRecord> dataFileWriter = new
>> DataFileWriter<GenericRecord>(datumWriter);
>> dataFileWriter.create(schema, file);
>> dataFileWriter.append(record0);
>> //
>> // HELP: job succeeds when we do not have record with non-null
>> baz, comment out to succeed
>> //
>> dataFileWriter.append(record1);
>> dataFileWriter.close();
>>
>> //
>> // configure and run job
>> Configuration configuration = new Configuration();
>> String[] otherArgs = new GenericOptionsParser(configuration,
>> args).getRemainingArgs();
>> Job job = Job.getInstance(configuration, "Sample Avro Map
>> Reduce");
>>
>> job.setInputFormatClass(AvroKeyInputFormat.class);
>> AvroJob.setInputKeySchema(job, schema);
>>
>> job.setMapperClass(SampleMapper.class);
>> job.setNumReduceTasks(0);
>>
>> AvroJob.setOutputKeySchema(job, schema);
>> AvroMultipleOutputs.addNamedOutput(job, "avro",
>> AvroKeyOutputFormat.class, schema);
>>
>> FileInputFormat.addInputPath(job, new
>> Path(("/tmp/avrotest/input")));
>> FileOutputFormat.setOutputPath(job, new
>> Path("/tmp/avrotest/output"));
>>
>> return (job.waitForCompletion(true) ? 0 : 1);
>> }
>>
>> public static void main(String[] args) throws Exception {
>> int exitCode = ToolRunner.run(new AvroContainerFileDriver(),
>> args);
>> System.exit(exitCode);
>> }
>> }
>> </mr_job>
>>
>> Thanks,
>> John Pauley
>> Sr. Software Engineer
>> ThreatTrack Security
>>
>>
>
Re: [hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
Outside hadoop: avro-1.7.6
Inside hadoop: avro-mapred-1.7.6-hadoop2
From: Stanley Shi <ss...@gopivotal.com>>
Reply-To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Date: Monday, March 3, 2014 at 8:30 PM
To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
which avro version are you using when running outside of hadoop?
Regards,
Stanley Shi,
[http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.png]
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>> wrote:
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
Re: [hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
Outside hadoop: avro-1.7.6
Inside hadoop: avro-mapred-1.7.6-hadoop2
From: Stanley Shi <ss...@gopivotal.com>>
Reply-To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Date: Monday, March 3, 2014 at 8:30 PM
To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
which avro version are you using when running outside of hadoop?
Regards,
Stanley Shi,
[http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.png]
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>> wrote:
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
Re: [hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
Outside hadoop: avro-1.7.6
Inside hadoop: avro-mapred-1.7.6-hadoop2
From: Stanley Shi <ss...@gopivotal.com>>
Reply-To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Date: Monday, March 3, 2014 at 8:30 PM
To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
which avro version are you using when running outside of hadoop?
Regards,
Stanley Shi,
[http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.png]
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>> wrote:
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
Re: [hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
Outside hadoop: avro-1.7.6
Inside hadoop: avro-mapred-1.7.6-hadoop2
From: Stanley Shi <ss...@gopivotal.com>>
Reply-To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Date: Monday, March 3, 2014 at 8:30 PM
To: "user@hadoop.apache.org<ma...@hadoop.apache.org>" <us...@hadoop.apache.org>>
Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
which avro version are you using when running outside of hadoop?
Regards,
Stanley Shi,
[http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.png]
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>> wrote:
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
which avro version are you using when running outside of hadoop?
Regards,
*Stanley Shi,*
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
> This is cross posted to avro-user list (
> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
> ).
>
> Hello all,
>
> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
> issue occurs when using a schema that has a union of null and a fixed
> (among other complex types), default to null, and it is not null. Please
> find the full stack trace below and a sample map/reduce job that generates
> an Avro container file and uses that for the m/r input. Note that I can
> serialize/deserialize without issue using
> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
> be helpful.
>
> Stack trace:
> java.lang.Exception:
> org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in
> union null of union in field baz of com.foo.bar.simple_schema
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
> ... 16 more
> Caused by: java.lang.NullPointerException
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
> at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>
> Sample m/r job:
> <mr_job>
> package com.tts.ox.mapreduce.example.avro;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericRecordBuilder;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.mapred.AvroKey;
> import org.apache.avro.mapreduce.AvroJob;
> import org.apache.avro.mapreduce.AvroKeyInputFormat;
> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
> import org.apache.avro.mapreduce.AvroMultipleOutputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.GenericOptionsParser;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> import java.io.File;
> import java.io.IOException;
>
> public class AvroContainerFileDriver extends Configured implements Tool {
> //
> // define a schema with a union of null and fixed
> private static final String SCHEMA = "{\n" +
> " \"namespace\": \"com.foo.bar\",\n" +
> " \"name\": \"simple_schema\",\n" +
> " \"type\": \"record\",\n" +
> " \"fields\": [{\n" +
> " \"name\": \"foo\",\n" +
> " \"type\": {\n" +
> " \"name\": \"bar\",\n" +
> " \"type\": \"fixed\",\n" +
> " \"size\": 2\n" +
> " }\n" +
> " }, {\n" +
> " \"name\": \"baz\",\n" +
> " \"type\": [\"null\", \"bar\"],\n" +
> " \"default\": null\n" +
> " }]\n" +
> "}";
>
> public static class SampleMapper extends
> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
> private AvroMultipleOutputs amos;
>
> @Override
> protected void setup(Context context) {
> amos = new AvroMultipleOutputs(context);
> }
>
> @Override
> protected void cleanup(Context context) throws IOException,
> InterruptedException {
> amos.close();
> }
>
> @Override
> protected void map(AvroKey<GenericRecord> record, NullWritable
> ignore, Context context)
> throws IOException, InterruptedException {
> // simply write the record to a container using AvroMultipleOutputs
> amos.write("avro", new AvroKey<GenericRecord>(record.datum()),
> NullWritable.get());
> }
> }
>
> @Override
> public int run(final String[] args) throws Exception {
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(SCHEMA);
>
> //
> // generate avro container file for input to mapper
> byte[] dummy = {(byte) 0x01, (byte) 0x02};
> GenericData.Fixed foo = new
> GenericData.Fixed(schema.getField("foo").schema(), dummy);
> GenericData.Fixed baz = new
> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>
> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
> .set(schema.getField("foo"), foo);
> GenericRecord record0 = builder.build(); // baz is null
>
> builder.set(schema.getField("baz"), baz);
> GenericRecord record1 = builder.build(); // baz is not null, bad
> news
>
> File file = new File("/tmp/avrotest/input/test.avro");
> DatumWriter<GenericRecord> datumWriter = new
> GenericDatumWriter<GenericRecord>(schema);
> DataFileWriter<GenericRecord> dataFileWriter = new
> DataFileWriter<GenericRecord>(datumWriter);
> dataFileWriter.create(schema, file);
> dataFileWriter.append(record0);
> //
> // HELP: job succeeds when we do not have record with non-null
> baz, comment out to succeed
> //
> dataFileWriter.append(record1);
> dataFileWriter.close();
>
> //
> // configure and run job
> Configuration configuration = new Configuration();
> String[] otherArgs = new GenericOptionsParser(configuration,
> args).getRemainingArgs();
> Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
>
> job.setInputFormatClass(AvroKeyInputFormat.class);
> AvroJob.setInputKeySchema(job, schema);
>
> job.setMapperClass(SampleMapper.class);
> job.setNumReduceTasks(0);
>
> AvroJob.setOutputKeySchema(job, schema);
> AvroMultipleOutputs.addNamedOutput(job, "avro",
> AvroKeyOutputFormat.class, schema);
>
> FileInputFormat.addInputPath(job, new
> Path(("/tmp/avrotest/input")));
> FileOutputFormat.setOutputPath(job, new
> Path("/tmp/avrotest/output"));
>
> return (job.waitForCompletion(true) ? 0 : 1);
> }
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
> System.exit(exitCode);
> }
> }
> </mr_job>
>
> Thanks,
> John Pauley
> Sr. Software Engineer
> ThreatTrack Security
>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
which avro version are you using when running outside of hadoop?
Regards,
*Stanley Shi,*
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
> This is cross posted to avro-user list (
> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
> ).
>
> Hello all,
>
> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
> issue occurs when using a schema that has a union of null and a fixed
> (among other complex types), default to null, and it is not null. Please
> find the full stack trace below and a sample map/reduce job that generates
> an Avro container file and uses that for the m/r input. Note that I can
> serialize/deserialize without issue using
> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
> be helpful.
>
> Stack trace:
> java.lang.Exception:
> org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in
> union null of union in field baz of com.foo.bar.simple_schema
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
> ... 16 more
> Caused by: java.lang.NullPointerException
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
> at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>
> Sample m/r job:
> <mr_job>
> package com.tts.ox.mapreduce.example.avro;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericRecordBuilder;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.mapred.AvroKey;
> import org.apache.avro.mapreduce.AvroJob;
> import org.apache.avro.mapreduce.AvroKeyInputFormat;
> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
> import org.apache.avro.mapreduce.AvroMultipleOutputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.GenericOptionsParser;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> import java.io.File;
> import java.io.IOException;
>
> public class AvroContainerFileDriver extends Configured implements Tool {
> //
> // define a schema with a union of null and fixed
> private static final String SCHEMA = "{\n" +
> " \"namespace\": \"com.foo.bar\",\n" +
> " \"name\": \"simple_schema\",\n" +
> " \"type\": \"record\",\n" +
> " \"fields\": [{\n" +
> " \"name\": \"foo\",\n" +
> " \"type\": {\n" +
> " \"name\": \"bar\",\n" +
> " \"type\": \"fixed\",\n" +
> " \"size\": 2\n" +
> " }\n" +
> " }, {\n" +
> " \"name\": \"baz\",\n" +
> " \"type\": [\"null\", \"bar\"],\n" +
> " \"default\": null\n" +
> " }]\n" +
> "}";
>
> public static class SampleMapper extends
> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
> private AvroMultipleOutputs amos;
>
> @Override
> protected void setup(Context context) {
> amos = new AvroMultipleOutputs(context);
> }
>
> @Override
> protected void cleanup(Context context) throws IOException,
> InterruptedException {
> amos.close();
> }
>
> @Override
> protected void map(AvroKey<GenericRecord> record, NullWritable
> ignore, Context context)
> throws IOException, InterruptedException {
> // simply write the record to a container using AvroMultipleOutputs
> amos.write("avro", new AvroKey<GenericRecord>(record.datum()),
> NullWritable.get());
> }
> }
>
> @Override
> public int run(final String[] args) throws Exception {
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(SCHEMA);
>
> //
> // generate avro container file for input to mapper
> byte[] dummy = {(byte) 0x01, (byte) 0x02};
> GenericData.Fixed foo = new
> GenericData.Fixed(schema.getField("foo").schema(), dummy);
> GenericData.Fixed baz = new
> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>
> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
> .set(schema.getField("foo"), foo);
> GenericRecord record0 = builder.build(); // baz is null
>
> builder.set(schema.getField("baz"), baz);
> GenericRecord record1 = builder.build(); // baz is not null, bad
> news
>
> File file = new File("/tmp/avrotest/input/test.avro");
> DatumWriter<GenericRecord> datumWriter = new
> GenericDatumWriter<GenericRecord>(schema);
> DataFileWriter<GenericRecord> dataFileWriter = new
> DataFileWriter<GenericRecord>(datumWriter);
> dataFileWriter.create(schema, file);
> dataFileWriter.append(record0);
> //
> // HELP: job succeeds when we do not have record with non-null
> baz, comment out to succeed
> //
> dataFileWriter.append(record1);
> dataFileWriter.close();
>
> //
> // configure and run job
> Configuration configuration = new Configuration();
> String[] otherArgs = new GenericOptionsParser(configuration,
> args).getRemainingArgs();
> Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
>
> job.setInputFormatClass(AvroKeyInputFormat.class);
> AvroJob.setInputKeySchema(job, schema);
>
> job.setMapperClass(SampleMapper.class);
> job.setNumReduceTasks(0);
>
> AvroJob.setOutputKeySchema(job, schema);
> AvroMultipleOutputs.addNamedOutput(job, "avro",
> AvroKeyOutputFormat.class, schema);
>
> FileInputFormat.addInputPath(job, new
> Path(("/tmp/avrotest/input")));
> FileOutputFormat.setOutputPath(job, new
> Path("/tmp/avrotest/output"));
>
> return (job.waitForCompletion(true) ? 0 : 1);
> }
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
> System.exit(exitCode);
> }
> }
> </mr_job>
>
> Thanks,
> John Pauley
> Sr. Software Engineer
> ThreatTrack Security
>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
which avro version are you using when running outside of hadoop?
Regards,
*Stanley Shi,*
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
> This is cross posted to avro-user list (
> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
> ).
>
> Hello all,
>
> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
> issue occurs when using a schema that has a union of null and a fixed
> (among other complex types), default to null, and it is not null. Please
> find the full stack trace below and a sample map/reduce job that generates
> an Avro container file and uses that for the m/r input. Note that I can
> serialize/deserialize without issue using
> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
> be helpful.
>
> Stack trace:
> java.lang.Exception:
> org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in
> union null of union in field baz of com.foo.bar.simple_schema
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
> ... 16 more
> Caused by: java.lang.NullPointerException
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
> at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>
> Sample m/r job:
> <mr_job>
> package com.tts.ox.mapreduce.example.avro;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericRecordBuilder;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.mapred.AvroKey;
> import org.apache.avro.mapreduce.AvroJob;
> import org.apache.avro.mapreduce.AvroKeyInputFormat;
> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
> import org.apache.avro.mapreduce.AvroMultipleOutputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.GenericOptionsParser;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> import java.io.File;
> import java.io.IOException;
>
> public class AvroContainerFileDriver extends Configured implements Tool {
> //
> // define a schema with a union of null and fixed
> private static final String SCHEMA = "{\n" +
> " \"namespace\": \"com.foo.bar\",\n" +
> " \"name\": \"simple_schema\",\n" +
> " \"type\": \"record\",\n" +
> " \"fields\": [{\n" +
> " \"name\": \"foo\",\n" +
> " \"type\": {\n" +
> " \"name\": \"bar\",\n" +
> " \"type\": \"fixed\",\n" +
> " \"size\": 2\n" +
> " }\n" +
> " }, {\n" +
> " \"name\": \"baz\",\n" +
> " \"type\": [\"null\", \"bar\"],\n" +
> " \"default\": null\n" +
> " }]\n" +
> "}";
>
> public static class SampleMapper extends
> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
> private AvroMultipleOutputs amos;
>
> @Override
> protected void setup(Context context) {
> amos = new AvroMultipleOutputs(context);
> }
>
> @Override
> protected void cleanup(Context context) throws IOException,
> InterruptedException {
> amos.close();
> }
>
> @Override
> protected void map(AvroKey<GenericRecord> record, NullWritable
> ignore, Context context)
> throws IOException, InterruptedException {
> // simply write the record to a container using AvroMultipleOutputs
> amos.write("avro", new AvroKey<GenericRecord>(record.datum()),
> NullWritable.get());
> }
> }
>
> @Override
> public int run(final String[] args) throws Exception {
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(SCHEMA);
>
> //
> // generate avro container file for input to mapper
> byte[] dummy = {(byte) 0x01, (byte) 0x02};
> GenericData.Fixed foo = new
> GenericData.Fixed(schema.getField("foo").schema(), dummy);
> GenericData.Fixed baz = new
> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>
> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
> .set(schema.getField("foo"), foo);
> GenericRecord record0 = builder.build(); // baz is null
>
> builder.set(schema.getField("baz"), baz);
> GenericRecord record1 = builder.build(); // baz is not null, bad
> news
>
> File file = new File("/tmp/avrotest/input/test.avro");
> DatumWriter<GenericRecord> datumWriter = new
> GenericDatumWriter<GenericRecord>(schema);
> DataFileWriter<GenericRecord> dataFileWriter = new
> DataFileWriter<GenericRecord>(datumWriter);
> dataFileWriter.create(schema, file);
> dataFileWriter.append(record0);
> //
> // HELP: job succeeds when we do not have record with non-null
> baz, comment out to succeed
> //
> dataFileWriter.append(record1);
> dataFileWriter.close();
>
> //
> // configure and run job
> Configuration configuration = new Configuration();
> String[] otherArgs = new GenericOptionsParser(configuration,
> args).getRemainingArgs();
> Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
>
> job.setInputFormatClass(AvroKeyInputFormat.class);
> AvroJob.setInputKeySchema(job, schema);
>
> job.setMapperClass(SampleMapper.class);
> job.setNumReduceTasks(0);
>
> AvroJob.setOutputKeySchema(job, schema);
> AvroMultipleOutputs.addNamedOutput(job, "avro",
> AvroKeyOutputFormat.class, schema);
>
> FileInputFormat.addInputPath(job, new
> Path(("/tmp/avrotest/input")));
> FileOutputFormat.setOutputPath(job, new
> Path("/tmp/avrotest/output"));
>
> return (job.waitForCompletion(true) ? 0 : 1);
> }
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
> System.exit(exitCode);
> }
> }
> </mr_job>
>
> Thanks,
> John Pauley
> Sr. Software Engineer
> ThreatTrack Security
>
>
Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by Stanley Shi <ss...@gopivotal.com>.
which avro version are you using when running outside of hadoop?
Regards,
*Stanley Shi,*
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <Jo...@threattrack.com>wrote:
> This is cross posted to avro-user list (
> http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e
> ).
>
> Hello all,
>
> I’m having an issue using AvroMultipleOutputs in a map/reduce job. The
> issue occurs when using a schema that has a union of null and a fixed
> (among other complex types), default to null, and it is not null. Please
> find the full stack trace below and a sample map/reduce job that generates
> an Avro container file and uses that for the m/r input. Note that I can
> serialize/deserialize without issue using
> GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would
> be helpful.
>
> Stack trace:
> java.lang.Exception:
> org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: in com.foo.bar.simple_schema in union null
> of union in field baz of com.foo.bar.simple_schema
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
> at
> org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
> at
> org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
> at
> com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in
> union null of union in field baz of com.foo.bar.simple_schema
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
> ... 16 more
> Caused by: java.lang.NullPointerException
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
> at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
> at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>
> Sample m/r job:
> <mr_job>
> package com.tts.ox.mapreduce.example.avro;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericRecordBuilder;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.mapred.AvroKey;
> import org.apache.avro.mapreduce.AvroJob;
> import org.apache.avro.mapreduce.AvroKeyInputFormat;
> import org.apache.avro.mapreduce.AvroKeyOutputFormat;
> import org.apache.avro.mapreduce.AvroMultipleOutputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.GenericOptionsParser;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> import java.io.File;
> import java.io.IOException;
>
> public class AvroContainerFileDriver extends Configured implements Tool {
> //
> // define a schema with a union of null and fixed
> private static final String SCHEMA = "{\n" +
> " \"namespace\": \"com.foo.bar\",\n" +
> " \"name\": \"simple_schema\",\n" +
> " \"type\": \"record\",\n" +
> " \"fields\": [{\n" +
> " \"name\": \"foo\",\n" +
> " \"type\": {\n" +
> " \"name\": \"bar\",\n" +
> " \"type\": \"fixed\",\n" +
> " \"size\": 2\n" +
> " }\n" +
> " }, {\n" +
> " \"name\": \"baz\",\n" +
> " \"type\": [\"null\", \"bar\"],\n" +
> " \"default\": null\n" +
> " }]\n" +
> "}";
>
> public static class SampleMapper extends
> Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
> private AvroMultipleOutputs amos;
>
> @Override
> protected void setup(Context context) {
> amos = new AvroMultipleOutputs(context);
> }
>
> @Override
> protected void cleanup(Context context) throws IOException,
> InterruptedException {
> amos.close();
> }
>
> @Override
> protected void map(AvroKey<GenericRecord> record, NullWritable
> ignore, Context context)
> throws IOException, InterruptedException {
> // simply write the record to a container using AvroMultipleOutputs
> amos.write("avro", new AvroKey<GenericRecord>(record.datum()),
> NullWritable.get());
> }
> }
>
> @Override
> public int run(final String[] args) throws Exception {
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(SCHEMA);
>
> //
> // generate avro container file for input to mapper
> byte[] dummy = {(byte) 0x01, (byte) 0x02};
> GenericData.Fixed foo = new
> GenericData.Fixed(schema.getField("foo").schema(), dummy);
> GenericData.Fixed baz = new
> GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
>
> GenericRecordBuilder builder = new GenericRecordBuilder(schema)
> .set(schema.getField("foo"), foo);
> GenericRecord record0 = builder.build(); // baz is null
>
> builder.set(schema.getField("baz"), baz);
> GenericRecord record1 = builder.build(); // baz is not null, bad
> news
>
> File file = new File("/tmp/avrotest/input/test.avro");
> DatumWriter<GenericRecord> datumWriter = new
> GenericDatumWriter<GenericRecord>(schema);
> DataFileWriter<GenericRecord> dataFileWriter = new
> DataFileWriter<GenericRecord>(datumWriter);
> dataFileWriter.create(schema, file);
> dataFileWriter.append(record0);
> //
> // HELP: job succeeds when we do not have record with non-null
> baz, comment out to succeed
> //
> dataFileWriter.append(record1);
> dataFileWriter.close();
>
> //
> // configure and run job
> Configuration configuration = new Configuration();
> String[] otherArgs = new GenericOptionsParser(configuration,
> args).getRemainingArgs();
> Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
>
> job.setInputFormatClass(AvroKeyInputFormat.class);
> AvroJob.setInputKeySchema(job, schema);
>
> job.setMapperClass(SampleMapper.class);
> job.setNumReduceTasks(0);
>
> AvroJob.setOutputKeySchema(job, schema);
> AvroMultipleOutputs.addNamedOutput(job, "avro",
> AvroKeyOutputFormat.class, schema);
>
> FileInputFormat.addInputPath(job, new
> Path(("/tmp/avrotest/input")));
> FileOutputFormat.setOutputPath(job, new
> Path("/tmp/avrotest/output"));
>
> return (job.waitForCompletion(true) ? 0 : 1);
> }
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
> System.exit(exitCode);
> }
> }
> </mr_job>
>
> Thanks,
> John Pauley
> Sr. Software Engineer
> ThreatTrack Security
>
>
[hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
[hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
[hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security
[hadoop] AvroMultipleOutputs
org.apache.avro.file.DataFileWriter$AppendWriteException
Posted by John Pauley <Jo...@threattrack.com>.
This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%3e).
Hello all,
I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful.
Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
public class AvroContainerFileDriver extends Configured implements Tool {
//
// define a schema with a union of null and fixed
private static final String SCHEMA = "{\n" +
" \"namespace\": \"com.foo.bar\",\n" +
" \"name\": \"simple_schema\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"foo\",\n" +
" \"type\": {\n" +
" \"name\": \"bar\",\n" +
" \"type\": \"fixed\",\n" +
" \"size\": 2\n" +
" }\n" +
" }, {\n" +
" \"name\": \"baz\",\n" +
" \"type\": [\"null\", \"bar\"],\n" +
" \"default\": null\n" +
" }]\n" +
"}";
public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
private AvroMultipleOutputs amos;
@Override
protected void setup(Context context) {
amos = new AvroMultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
@Override
protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context)
throws IOException, InterruptedException {
// simply write the record to a container using AvroMultipleOutputs
amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
}
}
@Override
public int run(final String[] args) throws Exception {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
//
// generate avro container file for input to mapper
byte[] dummy = {(byte) 0x01, (byte) 0x02};
GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy);
GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);
GenericRecordBuilder builder = new GenericRecordBuilder(schema)
.set(schema.getField("foo"), foo);
GenericRecord record0 = builder.build(); // baz is null
builder.set(schema.getField("baz"), baz);
GenericRecord record1 = builder.build(); // baz is not null, bad news
File file = new File("/tmp/avrotest/input/test.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record0);
//
// HELP: job succeeds when we do not have record with non-null baz, comment out to succeed
//
dataFileWriter.append(record1);
dataFileWriter.close();
//
// configure and run job
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, schema);
job.setMapperClass(SampleMapper.class);
job.setNumReduceTasks(0);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
System.exit(exitCode);
}
}
</mr_job>
Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security