You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/09/14 01:40:11 UTC
svn commit: r1384600 - in /avro/trunk: CHANGES.txt
lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
Author: cutting
Date: Thu Sep 13 23:40:10 2012
New Revision: 1384600
URL: http://svn.apache.org/viewvc?rev=1384600&view=rev
Log:
AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support reflection. Contributed by Alexandre Normand.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Sep 13 23:40:10 2012
@@ -28,6 +28,9 @@ Avro 1.7.2 (unreleased)
AVRO-1129. C: Detect when avro_schema_decref frees schema.
(Maxim Pugachev via dcreager)
+ AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support
+ reflection. (Alexandre Normand via cutting)
+
BUG FIXES
AVRO-1128. Java: Fix SpecificRecordBase#equals() to work for
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Thu Sep 13 23:40:10 2012
@@ -27,8 +27,8 @@ import org.apache.avro.hadoop.io.AvroDat
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -69,7 +69,7 @@ public class AvroKeyValueRecordWriter<K,
// Create an Avro container file and a writer to it.
mAvroFileWriter = new DataFileWriter<GenericRecord>(
- new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema));
+ new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
mAvroFileWriter.setCodec(compressionCodec);
mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java Thu Sep 13 23:40:10 2012
@@ -34,6 +34,8 @@ import org.apache.avro.hadoop.io.AvroDat
import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -101,4 +103,56 @@ public class TestAvroKeyValueRecordWrite
assertFalse(avroFileReader.hasNext());
avroFileReader.close();
}
+
+ public static class R1 {
+ String attribute;
+ }
+ @Test public void testUsingReflection() throws Exception {
+ Job job = new Job();
+ Schema schema = ReflectData.get().getSchema(R1.class);
+ AvroJob.setOutputValueSchema(job, schema);
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+ replay(context);
+
+ R1 record = new R1();
+ record.attribute = "test";
+ AvroValue<R1> avroValue = new AvroValue<R1>(record);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ AvroDatumConverterFactory factory =
+ new AvroDatumConverterFactory(job.getConfiguration());
+
+ AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+
+ @SuppressWarnings("unchecked")
+ AvroDatumConverter<AvroValue<R1>, R1> valueConverter =
+ factory.create((Class<AvroValue<R1>>) avroValue.getClass());
+
+ AvroKeyValueRecordWriter<Text, AvroValue<R1>> writer =
+ new AvroKeyValueRecordWriter<Text, AvroValue<R1>>(keyConverter,
+ valueConverter, CodecFactory.nullCodec(), outputStream);
+
+ writer.write(new Text("reflectionData"), avroValue);
+ writer.close(context);
+
+ verify(context);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ Schema readerSchema = AvroKeyValue.getSchema(
+ Schema.create(Schema.Type.STRING), schema);
+ DatumReader<GenericRecord> datumReader =
+ new ReflectDatumReader<GenericRecord>(readerSchema);
+ DataFileStream<GenericRecord> avroFileReader =
+ new DataFileStream<GenericRecord>(inputStream, datumReader);
+
+ // Verify that the first record was written.
+ assertTrue(avroFileReader.hasNext());
+
+ // Verify that the record holds the same data that we've written
+ AvroKeyValue<CharSequence, R1> firstRecord =
+ new AvroKeyValue<CharSequence, R1>(avroFileReader.next());
+ assertNotNull(firstRecord.get());
+ assertEquals("reflectionData", firstRecord.getKey().toString());
+ assertEquals(record.attribute, firstRecord.getValue().attribute);
+ }
}