You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/26 00:52:56 UTC

svn commit: r807847 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/reflect/ src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/

Author: cutting
Date: Tue Aug 25 22:52:55 2009
New Revision: 807847

URL: http://svn.apache.org/viewvc?rev=807847&view=rev
Log:
AVRO-104.  Permit null fields with Java reflection.  Contributed by Eelco Hillenius.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
    hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 25 22:52:55 2009
@@ -11,6 +11,9 @@
 
     AVRO-76. Add Java RPC plugin framework.  (George Porter)
 
+    AVRO-104. Permit null fields in Java reflection.
+    (Eelco Hillenius via cutting)
+
   IMPROVEMENTS
 
     AVRO-71.  C++: make deserializer more generic.  (Scott Banachowski

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java Tue Aug 25 22:52:55 2009
@@ -23,6 +23,7 @@
 import java.lang.reflect.ParameterizedType;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -45,10 +46,26 @@
 
 /** Utilities to use existing Java classes and interfaces via reflection. */
 public class ReflectData {
-  private ReflectData() {}
+  
+  /** Create a {@link ReflectData} instance that permits record fields to be
+   * null.  The schema generated for each field is a union of its declared type
+   * and null. */
+  public static ReflectData newNullAllowingInstance() {
+    return new ReflectData() {
+      @Override
+        protected Schema createFieldSchema(Field field,
+                                           Map<String, Schema> names) {
+        Schema schema = super.createFieldSchema(field, names);
+        return Schema.createUnion(Arrays.asList(new Schema[] { schema,
+            Schema.create(Schema.Type.NULL) }));
+      }
+    };
+  }
+  
+  public ReflectData() {}
   
   /** Returns true if an object matches a schema. */
-  public static boolean validate(Schema schema, Object datum) {
+  public boolean validate(Schema schema, Object datum) {
     switch (schema.getType()) {
     case RECORD:
       Class c = datum.getClass(); 
@@ -100,7 +117,7 @@
     }
   }
 
-  private static final WeakHashMap<java.lang.reflect.Type,Schema> SCHEMA_CACHE =
+  private final WeakHashMap<java.lang.reflect.Type,Schema> schemaCache =
     new WeakHashMap<java.lang.reflect.Type,Schema>();
 
   /** Generate a schema for a Java type.
@@ -109,18 +126,23 @@
    * <p>Note that unions cannot be automatically generated by this method,
    * since Java provides no representation for unions.</p>
    */
-  public static Schema getSchema(java.lang.reflect.Type type) {
-    Schema schema = SCHEMA_CACHE.get(type);
+  public Schema getSchema(java.lang.reflect.Type type) {
+    Schema schema = schemaCache.get(type);
     if (schema == null) {
       schema = createSchema(type, new LinkedHashMap<String,Schema>());
-      SCHEMA_CACHE.put(type, schema);
+      schemaCache.put(type, schema);
     }
     return schema;
   }
 
+  /**
+   * Create a schema for a type and it's fields. Note that by design only fields
+   * of the direct class, not it's super classes, are used for creating the
+   * schema.  Also, fields are not permitted to be null.
+   */
   @SuppressWarnings(value="unchecked")
-  private static Schema createSchema(java.lang.reflect.Type type,
-                                     Map<String,Schema> names) {
+  protected Schema createSchema(java.lang.reflect.Type type,
+                                Map<String,Schema> names) {
     if (type == Utf8.class)
       return Schema.create(Type.STRING);
     else if (type == ByteBuffer.class)
@@ -188,7 +210,7 @@
           names.put(name, schema);
         for (Field field : c.getDeclaredFields())
           if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0) {
-            Schema fieldSchema = createSchema(field.getGenericType(), names);
+            Schema fieldSchema = createFieldSchema(field, names);
             fields.put(field.getName(), new Schema.Field(fieldSchema, null));
           }
         schema.setFields(fields);
@@ -198,13 +220,18 @@
     throw new AvroTypeException("Unknown type: "+type);
   }
 
+  /** Create a schema for a field. */
+  protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
+    return createSchema(field.getGenericType(), names);
+  }
+
   /** Generate a protocol for a Java interface.
    * <p>Note that this requires that <a
    * href="http://paranamer.codehaus.org/">Paranamer</a> is run over compiled
    * interface declarations, since Java 6 reflection does not provide access to
    * method parameter names.  See Avro's build.xml for an example. </p>
    */
-  public static Protocol getProtocol(Class iface) {
+  public Protocol getProtocol(Class iface) {
     Protocol protocol =
       new Protocol(iface.getSimpleName(), iface.getPackage().getName()); 
     for (Method method : iface.getDeclaredMethods())
@@ -224,13 +251,13 @@
     return protocol;
   }
 
-  private static final Paranamer PARANAMER = new CachingParanamer();
+  private final Paranamer paranamer = new CachingParanamer();
 
-  private static Message getMessage(Method method, Protocol protocol) {
+  private Message getMessage(Method method, Protocol protocol) {
     Map<String,Schema> names = protocol.getTypes();
     LinkedHashMap<String,Schema.Field> fields =
       new LinkedHashMap<String,Schema.Field>();
-    String[] paramNames = PARANAMER.lookupParameterNames(method);
+    String[] paramNames = paranamer.lookupParameterNames(method);
     java.lang.reflect.Type[] paramTypes = method.getGenericParameterTypes();
     for (int i = 0; i < paramTypes.length; i++)
       fields.put(paramNames[i],

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java Tue Aug 25 22:52:55 2009
@@ -30,10 +30,23 @@
  * via Java reflection.
  */
 public class ReflectDatumWriter extends GenericDatumWriter<Object> {
-  public ReflectDatumWriter() {}
+  private final ReflectData reflectData;
+  
+  public ReflectDatumWriter() {
+    this(new ReflectData());
+  }
 
   public ReflectDatumWriter(Schema root) {
+    this(root, new ReflectData());
+  }
+
+  public ReflectDatumWriter(Schema root, ReflectData reflectData) {
     super(root);
+    this.reflectData = reflectData;
+  }
+  
+  public ReflectDatumWriter(ReflectData reflectData) {
+    this.reflectData = reflectData;
   }
   
   protected Object getField(Object record, String name, int position) {
@@ -55,12 +68,12 @@
 
   @Override
   protected boolean isRecord(Object datum) {
-    return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
+    return reflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
   }
 
   @Override
   protected Schema getRecordSchema(Object record) {
-    return ReflectData.getSchema(record.getClass());
+    return reflectData.getSchema(record.getClass());
   }
 
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java Tue Aug 25 22:52:55 2009
@@ -40,11 +40,21 @@
   
   public ReflectRequestor(Class<?> iface, Transceiver transceiver)
     throws IOException {
-    this(ReflectData.getProtocol(iface), transceiver);
+    this(iface, transceiver, new ReflectData());
   }
 
   protected ReflectRequestor(Protocol protocol, Transceiver transceiver)
     throws IOException {
+    this(protocol, transceiver, new ReflectData());
+  }
+
+  public ReflectRequestor(Class<?> iface, Transceiver transceiver, ReflectData reflectData)
+    throws IOException {
+    this(reflectData.getProtocol(iface), transceiver, reflectData);
+  }
+
+  protected ReflectRequestor(Protocol protocol, Transceiver transceiver, ReflectData reflectData)
+    throws IOException {
     super(protocol, transceiver);
     this.packageName = protocol.getNamespace()+"."+protocol.getName()+"$";
   }
@@ -82,7 +92,13 @@
   /** Create a proxy instance whose methods invoke RPCs. */
   public static Object getClient(Class<?> iface, Transceiver transciever)
     throws IOException {
-    Protocol protocol = ReflectData.getProtocol(iface);
+    return getClient(iface, transciever, new ReflectData());
+  }
+
+  /** Create a proxy instance whose methods invoke RPCs. */
+  public static Object getClient(Class<?> iface, Transceiver transciever, ReflectData reflectData)
+    throws IOException {
+    Protocol protocol = reflectData.getProtocol(iface);
     return Proxy.newProxyInstance(iface.getClassLoader(),
                                   new Class[] { iface },
                                   new ReflectRequestor(protocol, transciever));

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java Tue Aug 25 22:52:55 2009
@@ -42,7 +42,11 @@
   protected String packageName;
 
   public ReflectResponder(Class iface, Object impl) {
-    super(ReflectData.getProtocol(iface));
+    this(iface, impl, new ReflectData());
+  }
+  
+  public ReflectResponder(Class iface, Object impl, ReflectData reflectData) {
+    super(reflectData.getProtocol(iface));
     this.impl = impl;
     this.packageName = getLocal().getNamespace()+"."+getLocal().getName()+"$";
   }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java Tue Aug 25 22:52:55 2009
@@ -34,7 +34,12 @@
   
   public SpecificRequestor(Class<?> iface, Transceiver transceiver)
     throws IOException {
-    this(ReflectData.getProtocol(iface), transceiver);
+    this(iface, transceiver, new ReflectData());
+  }
+
+  public SpecificRequestor(Class<?> iface, Transceiver transceiver, ReflectData reflectData)
+    throws IOException {
+    this(reflectData.getProtocol(iface), transceiver);
   }
   
   private SpecificRequestor(Protocol protocol, Transceiver transceiver)
@@ -53,7 +58,13 @@
   /** Create a proxy instance whose methods invoke RPCs. */
   public static Object getClient(Class<?> iface, Transceiver transciever)
     throws IOException {
-    Protocol protocol = ReflectData.getProtocol(iface);
+    return getClient(iface, transciever, new ReflectData());
+  }
+
+  /** Create a proxy instance whose methods invoke RPCs. */
+  public static Object getClient(Class<?> iface, Transceiver transciever, ReflectData reflectData)
+    throws IOException {
+    Protocol protocol = reflectData.getProtocol(iface);
     return Proxy.newProxyInstance(iface.getClassLoader(),
                                   new Class[] { iface },
                                   new SpecificRequestor(protocol, transciever));

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java Tue Aug 25 22:52:55 2009
@@ -32,7 +32,11 @@
   @Override
   public boolean equals(Object that) {
     if (that instanceof BarRecord) {
-      return this.beerMsg.equals(((BarRecord) that).beerMsg);
+      if (this.beerMsg == null) {
+        return ((BarRecord) that).beerMsg == null;
+      } else {
+        return this.beerMsg.equals(((BarRecord) that).beerMsg);
+      }
     }
     return false;
   }

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java Tue Aug 25 22:52:55 2009
@@ -47,9 +47,10 @@
   public void testMultiReflectWithUnionBeforeWriting() throws IOException {
     FileOutputStream fos = new FileOutputStream(FILE);
 
+    ReflectData reflectData = new ReflectData();
     List<Schema> schemas = Arrays.asList(new Schema[] {
-        ReflectData.getSchema(FooRecord.class),
-        ReflectData.getSchema(BarRecord.class) });
+        reflectData.getSchema(FooRecord.class),
+        reflectData.getSchema(BarRecord.class) });
     Schema union = Schema.createUnion(schemas);
     DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
         new ReflectDatumWriter(union));
@@ -82,8 +83,9 @@
   public void testMultiReflectWithUntionAfterWriting() throws IOException {
     FileOutputStream fos = new FileOutputStream(FILE);
 
+    ReflectData reflectData = new ReflectData();
     List<Schema> schemas = new ArrayList<Schema>();
-    schemas.add(ReflectData.getSchema(FooRecord.class));
+    schemas.add(reflectData.getSchema(FooRecord.class));
     Schema union = Schema.createUnion(schemas);
     DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
         new ReflectDatumWriter(union));
@@ -94,7 +96,7 @@
     write(writer, new FooRecord(15), check);
 
     // we have a new type, add it to the file
-    writer.addSchema(ReflectData.getSchema(BarRecord.class));
+    writer.addSchema(reflectData.getSchema(BarRecord.class));
 
     // test writing those new types to a file
     write(writer, new BarRecord("One beer please"), check);
@@ -120,6 +122,38 @@
     reader.close();
   }
 
+  /*
+   * Test that writing a record with a field that is null.
+   */
+  @Test
+  public void testNull() throws IOException {
+    FileOutputStream fos = new FileOutputStream(FILE);
+
+    ReflectData reflectData = ReflectData.newNullAllowingInstance();
+    Schema schema = reflectData.getSchema(BarRecord.class);
+    DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
+        new ReflectDatumWriter(schema, reflectData));
+
+    // test writing to a file
+    CheckList check = new CheckList();
+    write(writer, new BarRecord("One beer please"), check);
+    // null record here, fails when using the default reflectData instance
+    write(writer, new BarRecord(), check);
+    write(writer, new BarRecord("Two beers please"), check);
+    writer.close();
+
+    ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+    SeekableFileInput sin = new SeekableFileInput(FILE);
+    DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+    Object datum = null;
+    long count = reader.getMetaLong("count");
+    for (int i = 0; i < count; i++) {
+      datum = reader.next(datum);
+      check.assertEquals(datum, i);
+    }
+    reader.close();
+  }
+
   private void write(DataFileWriter<Object> writer, Object o, CheckList l)
       throws IOException {
     writer.append(l.addAndReturn(o));

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java Tue Aug 25 22:52:55 2009
@@ -17,16 +17,24 @@
  */
 package org.apache.avro;
 
-import org.apache.avro.reflect.*;
-import org.apache.avro.io.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.TestReflect.SampleRecord.AnotherSampleRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.test.Simple;
 import org.apache.avro.test.Simple.TestRecord;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
-import java.io.*;
 
 public class TestReflect {
   private static final Logger LOG
@@ -45,23 +53,18 @@
   @Test
   public void testSchema() throws IOException {
     assertEquals(PROTOCOL.getTypes().get("TestRecord"),
-                 ReflectData.getSchema(TestRecord.class));
+                 new ReflectData().getSchema(TestRecord.class));
   }
 
   @Test
   public void testProtocol() throws IOException {
-    assertEquals(PROTOCOL, ReflectData.getProtocol(Simple.class));
+    assertEquals(PROTOCOL, new ReflectData().getProtocol(Simple.class));
   }
 
   @Test
   public void testRecord() throws IOException {
-    Schema schm = ReflectData.getSchema(SampleRecord.class);
-    Class<?> c = SampleRecord.class;
-    String prefix =  
-      ((c.getEnclosingClass() == null 
-        || "null".equals(c.getEnclosingClass())) ? 
-       c.getPackage().getName() + "." 
-       : (c.getEnclosingClass().getName() + "$"));
+    Schema schm = new ReflectData().getSchema(SampleRecord.class);
+    String prefix = getPrefix(SampleRecord.class);
     ReflectDatumWriter writer = new ReflectDatumWriter(schm);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     SampleRecord record = new SampleRecord();
@@ -75,6 +78,35 @@
     assertEquals(record, decoded);
   }
 
+  @Test
+  public void testRecordWithNull() throws IOException {
+    ReflectData reflectData = ReflectData.newNullAllowingInstance();
+    Schema schm = reflectData.getSchema(AnotherSampleRecord.class);
+    String prefix = getPrefix(AnotherSampleRecord.class);
+    ReflectDatumWriter writer = new ReflectDatumWriter(schm);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // keep record.a null and see if that works
+    AnotherSampleRecord a = new AnotherSampleRecord();
+    writer.write(a, new BinaryEncoder(out));
+    AnotherSampleRecord b = new AnotherSampleRecord(10);
+    writer.write(b, new BinaryEncoder(out));
+    ReflectDatumReader reader = new ReflectDatumReader(schm, prefix);
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    Object decoded = reader.read(null, new BinaryDecoder(in));
+    assertEquals(a, decoded);
+    decoded = reader.read(null, new BinaryDecoder(in));
+    assertEquals(b, decoded);
+  }
+
+  private String getPrefix(Class<?> c) {
+    String prefix =  
+      ((c.getEnclosingClass() == null 
+        || "null".equals(c.getEnclosingClass())) ? 
+       c.getPackage().getName() + "." 
+       : (c.getEnclosingClass().getName() + "$"));
+    return prefix;
+  }
+
   public static class SampleRecord {
     public int x = 1;
     private int y = 2;
@@ -97,5 +129,27 @@
         return false;
       return true;
     }
+    
+    public static class AnotherSampleRecord {
+      private Integer a = null;
+      
+      public AnotherSampleRecord() {
+      }
+      
+      AnotherSampleRecord(Integer a) {
+        this.a = a;
+      }
+
+      public int hashCode() {
+        return (a != null ? a.hashCode() : 0);
+      }
+
+      public boolean equals(Object other) {
+        if (other instanceof AnotherSampleRecord) {
+          return this.a == ((AnotherSampleRecord)other).a;
+        }
+        return false;
+      }
+    }
   }
 }