You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/09/14 01:40:11 UTC

svn commit: r1384600 - in /avro/trunk: CHANGES.txt lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java

Author: cutting
Date: Thu Sep 13 23:40:10 2012
New Revision: 1384600

URL: http://svn.apache.org/viewvc?rev=1384600&view=rev
Log:
AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support reflection.  Contributed by Alexandre Normand.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Sep 13 23:40:10 2012
@@ -28,6 +28,9 @@ Avro 1.7.2 (unreleased)
     AVRO-1129. C: Detect when avro_schema_decref frees schema.
     (Maxim Pugachev via dcreager)
 
+    AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support
+    reflection.  (Alexandre Normand via cutting)
+
   BUG FIXES
 
     AVRO-1128. Java: Fix SpecificRecordBase#equals() to work for

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Thu Sep 13 23:40:10 2012
@@ -27,8 +27,8 @@ import org.apache.avro.hadoop.io.AvroDat
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -69,7 +69,7 @@ public class AvroKeyValueRecordWriter<K,
 
     // Create an Avro container file and a writer to it.
     mAvroFileWriter = new DataFileWriter<GenericRecord>(
-        new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema));
+        new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
     mAvroFileWriter.setCodec(compressionCodec);
     mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
 

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1384600&r1=1384599&r2=1384600&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java Thu Sep 13 23:40:10 2012
@@ -34,6 +34,8 @@ import org.apache.avro.hadoop.io.AvroDat
 import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -101,4 +103,56 @@ public class TestAvroKeyValueRecordWrite
     assertFalse(avroFileReader.hasNext());
     avroFileReader.close();
   }
+
+  public static class R1 {
+    String attribute;
+  }
+  @Test public void testUsingReflection() throws Exception {
+    Job job = new Job();
+    Schema schema = ReflectData.get().getSchema(R1.class);
+    AvroJob.setOutputValueSchema(job, schema);
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+    replay(context);
+
+    R1 record = new R1();
+    record.attribute = "test";
+    AvroValue<R1> avroValue = new AvroValue<R1>(record);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    AvroDatumConverterFactory factory =
+      new AvroDatumConverterFactory(job.getConfiguration());
+
+    AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+
+    @SuppressWarnings("unchecked")
+    AvroDatumConverter<AvroValue<R1>, R1> valueConverter =
+      factory.create((Class<AvroValue<R1>>) avroValue.getClass());
+
+    AvroKeyValueRecordWriter<Text, AvroValue<R1>> writer =
+      new AvroKeyValueRecordWriter<Text, AvroValue<R1>>(keyConverter,
+        valueConverter, CodecFactory.nullCodec(), outputStream);
+
+    writer.write(new Text("reflectionData"), avroValue);
+    writer.close(context);
+
+    verify(context);
+
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    Schema readerSchema = AvroKeyValue.getSchema(
+      Schema.create(Schema.Type.STRING), schema);
+    DatumReader<GenericRecord> datumReader =
+      new ReflectDatumReader<GenericRecord>(readerSchema);
+    DataFileStream<GenericRecord> avroFileReader =
+      new DataFileStream<GenericRecord>(inputStream, datumReader);
+
+    // Verify that the first record was written.
+    assertTrue(avroFileReader.hasNext());
+
+    // Verify that the record holds the same data that we've written
+    AvroKeyValue<CharSequence, R1> firstRecord =
+      new AvroKeyValue<CharSequence, R1>(avroFileReader.next());
+    assertNotNull(firstRecord.get());
+    assertEquals("reflectionData", firstRecord.getKey().toString());
+    assertEquals(record.attribute, firstRecord.getValue().attribute);
+  }
 }