You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/26 00:52:56 UTC
svn commit: r807847 - in /hadoop/avro/trunk: ./
src/java/org/apache/avro/reflect/ src/java/org/apache/avro/specific/
src/test/java/org/apache/avro/
Author: cutting
Date: Tue Aug 25 22:52:55 2009
New Revision: 807847
URL: http://svn.apache.org/viewvc?rev=807847&view=rev
Log:
AVRO-104. Permit null fields with Java reflection. Contributed by Eelco Hillenius.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 25 22:52:55 2009
@@ -11,6 +11,9 @@
AVRO-76. Add Java RPC plugin framework. (George Porter)
+ AVRO-104. Permit null fields in Java reflection.
+ (Eelco Hillenius via cutting)
+
IMPROVEMENTS
AVRO-71. C++: make deserializer more generic. (Scott Banachowski
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java Tue Aug 25 22:52:55 2009
@@ -23,6 +23,7 @@
import java.lang.reflect.ParameterizedType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -45,10 +46,26 @@
/** Utilities to use existing Java classes and interfaces via reflection. */
public class ReflectData {
- private ReflectData() {}
+
+ /** Create a {@link ReflectData} instance that permits record fields to be
+ * null. The schema generated for each field is a union of its declared type
+ * and null. */
+ public static ReflectData newNullAllowingInstance() {
+ return new ReflectData() {
+ @Override
+ protected Schema createFieldSchema(Field field,
+ Map<String, Schema> names) {
+ Schema schema = super.createFieldSchema(field, names);
+ return Schema.createUnion(Arrays.asList(new Schema[] { schema,
+ Schema.create(Schema.Type.NULL) }));
+ }
+ };
+ }
+
+ public ReflectData() {}
/** Returns true if an object matches a schema. */
- public static boolean validate(Schema schema, Object datum) {
+ public boolean validate(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
Class c = datum.getClass();
@@ -100,7 +117,7 @@
}
}
- private static final WeakHashMap<java.lang.reflect.Type,Schema> SCHEMA_CACHE =
+ private final WeakHashMap<java.lang.reflect.Type,Schema> schemaCache =
new WeakHashMap<java.lang.reflect.Type,Schema>();
/** Generate a schema for a Java type.
@@ -109,18 +126,23 @@
* <p>Note that unions cannot be automatically generated by this method,
* since Java provides no representation for unions.</p>
*/
- public static Schema getSchema(java.lang.reflect.Type type) {
- Schema schema = SCHEMA_CACHE.get(type);
+ public Schema getSchema(java.lang.reflect.Type type) {
+ Schema schema = schemaCache.get(type);
if (schema == null) {
schema = createSchema(type, new LinkedHashMap<String,Schema>());
- SCHEMA_CACHE.put(type, schema);
+ schemaCache.put(type, schema);
}
return schema;
}
+ /**
+ * Create a schema for a type and it's fields. Note that by design only fields
+ * of the direct class, not it's super classes, are used for creating the
+ * schema. Also, fields are not permitted to be null.
+ */
@SuppressWarnings(value="unchecked")
- private static Schema createSchema(java.lang.reflect.Type type,
- Map<String,Schema> names) {
+ protected Schema createSchema(java.lang.reflect.Type type,
+ Map<String,Schema> names) {
if (type == Utf8.class)
return Schema.create(Type.STRING);
else if (type == ByteBuffer.class)
@@ -188,7 +210,7 @@
names.put(name, schema);
for (Field field : c.getDeclaredFields())
if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0) {
- Schema fieldSchema = createSchema(field.getGenericType(), names);
+ Schema fieldSchema = createFieldSchema(field, names);
fields.put(field.getName(), new Schema.Field(fieldSchema, null));
}
schema.setFields(fields);
@@ -198,13 +220,18 @@
throw new AvroTypeException("Unknown type: "+type);
}
+ /** Create a schema for a field. */
+ protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
+ return createSchema(field.getGenericType(), names);
+ }
+
/** Generate a protocol for a Java interface.
* <p>Note that this requires that <a
* href="http://paranamer.codehaus.org/">Paranamer</a> is run over compiled
* interface declarations, since Java 6 reflection does not provide access to
* method parameter names. See Avro's build.xml for an example. </p>
*/
- public static Protocol getProtocol(Class iface) {
+ public Protocol getProtocol(Class iface) {
Protocol protocol =
new Protocol(iface.getSimpleName(), iface.getPackage().getName());
for (Method method : iface.getDeclaredMethods())
@@ -224,13 +251,13 @@
return protocol;
}
- private static final Paranamer PARANAMER = new CachingParanamer();
+ private final Paranamer paranamer = new CachingParanamer();
- private static Message getMessage(Method method, Protocol protocol) {
+ private Message getMessage(Method method, Protocol protocol) {
Map<String,Schema> names = protocol.getTypes();
LinkedHashMap<String,Schema.Field> fields =
new LinkedHashMap<String,Schema.Field>();
- String[] paramNames = PARANAMER.lookupParameterNames(method);
+ String[] paramNames = paranamer.lookupParameterNames(method);
java.lang.reflect.Type[] paramTypes = method.getGenericParameterTypes();
for (int i = 0; i < paramTypes.length; i++)
fields.put(paramNames[i],
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java Tue Aug 25 22:52:55 2009
@@ -30,10 +30,23 @@
* via Java reflection.
*/
public class ReflectDatumWriter extends GenericDatumWriter<Object> {
- public ReflectDatumWriter() {}
+ private final ReflectData reflectData;
+
+ public ReflectDatumWriter() {
+ this(new ReflectData());
+ }
public ReflectDatumWriter(Schema root) {
+ this(root, new ReflectData());
+ }
+
+ public ReflectDatumWriter(Schema root, ReflectData reflectData) {
super(root);
+ this.reflectData = reflectData;
+ }
+
+ public ReflectDatumWriter(ReflectData reflectData) {
+ this.reflectData = reflectData;
}
protected Object getField(Object record, String name, int position) {
@@ -55,12 +68,12 @@
@Override
protected boolean isRecord(Object datum) {
- return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
+ return reflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
}
@Override
protected Schema getRecordSchema(Object record) {
- return ReflectData.getSchema(record.getClass());
+ return reflectData.getSchema(record.getClass());
}
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java Tue Aug 25 22:52:55 2009
@@ -40,11 +40,21 @@
public ReflectRequestor(Class<?> iface, Transceiver transceiver)
throws IOException {
- this(ReflectData.getProtocol(iface), transceiver);
+ this(iface, transceiver, new ReflectData());
}
protected ReflectRequestor(Protocol protocol, Transceiver transceiver)
throws IOException {
+ this(protocol, transceiver, new ReflectData());
+ }
+
+ public ReflectRequestor(Class<?> iface, Transceiver transceiver, ReflectData reflectData)
+ throws IOException {
+ this(reflectData.getProtocol(iface), transceiver, reflectData);
+ }
+
+ protected ReflectRequestor(Protocol protocol, Transceiver transceiver, ReflectData reflectData)
+ throws IOException {
super(protocol, transceiver);
this.packageName = protocol.getNamespace()+"."+protocol.getName()+"$";
}
@@ -82,7 +92,13 @@
/** Create a proxy instance whose methods invoke RPCs. */
public static Object getClient(Class<?> iface, Transceiver transciever)
throws IOException {
- Protocol protocol = ReflectData.getProtocol(iface);
+ return getClient(iface, transciever, new ReflectData());
+ }
+
+ /** Create a proxy instance whose methods invoke RPCs. */
+ public static Object getClient(Class<?> iface, Transceiver transciever, ReflectData reflectData)
+ throws IOException {
+ Protocol protocol = reflectData.getProtocol(iface);
return Proxy.newProxyInstance(iface.getClassLoader(),
new Class[] { iface },
new ReflectRequestor(protocol, transciever));
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java Tue Aug 25 22:52:55 2009
@@ -42,7 +42,11 @@
protected String packageName;
public ReflectResponder(Class iface, Object impl) {
- super(ReflectData.getProtocol(iface));
+ this(iface, impl, new ReflectData());
+ }
+
+ public ReflectResponder(Class iface, Object impl, ReflectData reflectData) {
+ super(reflectData.getProtocol(iface));
this.impl = impl;
this.packageName = getLocal().getNamespace()+"."+getLocal().getName()+"$";
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java Tue Aug 25 22:52:55 2009
@@ -34,7 +34,12 @@
public SpecificRequestor(Class<?> iface, Transceiver transceiver)
throws IOException {
- this(ReflectData.getProtocol(iface), transceiver);
+ this(iface, transceiver, new ReflectData());
+ }
+
+ public SpecificRequestor(Class<?> iface, Transceiver transceiver, ReflectData reflectData)
+ throws IOException {
+ this(reflectData.getProtocol(iface), transceiver);
}
private SpecificRequestor(Protocol protocol, Transceiver transceiver)
@@ -53,7 +58,13 @@
/** Create a proxy instance whose methods invoke RPCs. */
public static Object getClient(Class<?> iface, Transceiver transciever)
throws IOException {
- Protocol protocol = ReflectData.getProtocol(iface);
+ return getClient(iface, transciever, new ReflectData());
+ }
+
+ /** Create a proxy instance whose methods invoke RPCs. */
+ public static Object getClient(Class<?> iface, Transceiver transciever, ReflectData reflectData)
+ throws IOException {
+ Protocol protocol = reflectData.getProtocol(iface);
return Proxy.newProxyInstance(iface.getClassLoader(),
new Class[] { iface },
new SpecificRequestor(protocol, transciever));
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java Tue Aug 25 22:52:55 2009
@@ -32,7 +32,11 @@
@Override
public boolean equals(Object that) {
if (that instanceof BarRecord) {
- return this.beerMsg.equals(((BarRecord) that).beerMsg);
+ if (this.beerMsg == null) {
+ return ((BarRecord) that).beerMsg == null;
+ } else {
+ return this.beerMsg.equals(((BarRecord) that).beerMsg);
+ }
}
return false;
}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java Tue Aug 25 22:52:55 2009
@@ -47,9 +47,10 @@
public void testMultiReflectWithUnionBeforeWriting() throws IOException {
FileOutputStream fos = new FileOutputStream(FILE);
+ ReflectData reflectData = new ReflectData();
List<Schema> schemas = Arrays.asList(new Schema[] {
- ReflectData.getSchema(FooRecord.class),
- ReflectData.getSchema(BarRecord.class) });
+ reflectData.getSchema(FooRecord.class),
+ reflectData.getSchema(BarRecord.class) });
Schema union = Schema.createUnion(schemas);
DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
new ReflectDatumWriter(union));
@@ -82,8 +83,9 @@
public void testMultiReflectWithUntionAfterWriting() throws IOException {
FileOutputStream fos = new FileOutputStream(FILE);
+ ReflectData reflectData = new ReflectData();
List<Schema> schemas = new ArrayList<Schema>();
- schemas.add(ReflectData.getSchema(FooRecord.class));
+ schemas.add(reflectData.getSchema(FooRecord.class));
Schema union = Schema.createUnion(schemas);
DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
new ReflectDatumWriter(union));
@@ -94,7 +96,7 @@
write(writer, new FooRecord(15), check);
// we have a new type, add it to the file
- writer.addSchema(ReflectData.getSchema(BarRecord.class));
+ writer.addSchema(reflectData.getSchema(BarRecord.class));
// test writing those new types to a file
write(writer, new BarRecord("One beer please"), check);
@@ -120,6 +122,38 @@
reader.close();
}
+ /*
+ * Test that writing a record with a field that is null.
+ */
+ @Test
+ public void testNull() throws IOException {
+ FileOutputStream fos = new FileOutputStream(FILE);
+
+ ReflectData reflectData = ReflectData.newNullAllowingInstance();
+ Schema schema = reflectData.getSchema(BarRecord.class);
+ DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
+ new ReflectDatumWriter(schema, reflectData));
+
+ // test writing to a file
+ CheckList check = new CheckList();
+ write(writer, new BarRecord("One beer please"), check);
+ // null record here, fails when using the default reflectData instance
+ write(writer, new BarRecord(), check);
+ write(writer, new BarRecord("Two beers please"), check);
+ writer.close();
+
+ ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+ SeekableFileInput sin = new SeekableFileInput(FILE);
+ DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+ Object datum = null;
+ long count = reader.getMetaLong("count");
+ for (int i = 0; i < count; i++) {
+ datum = reader.next(datum);
+ check.assertEquals(datum, i);
+ }
+ reader.close();
+ }
+
private void write(DataFileWriter<Object> writer, Object o, CheckList l)
throws IOException {
writer.append(l.addAndReturn(o));
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java?rev=807847&r1=807846&r2=807847&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java Tue Aug 25 22:52:55 2009
@@ -17,16 +17,24 @@
*/
package org.apache.avro;
-import org.apache.avro.reflect.*;
-import org.apache.avro.io.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.TestReflect.SampleRecord.AnotherSampleRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.test.Simple;
import org.apache.avro.test.Simple.TestRecord;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
-import java.io.*;
public class TestReflect {
private static final Logger LOG
@@ -45,23 +53,18 @@
@Test
public void testSchema() throws IOException {
assertEquals(PROTOCOL.getTypes().get("TestRecord"),
- ReflectData.getSchema(TestRecord.class));
+ new ReflectData().getSchema(TestRecord.class));
}
@Test
public void testProtocol() throws IOException {
- assertEquals(PROTOCOL, ReflectData.getProtocol(Simple.class));
+ assertEquals(PROTOCOL, new ReflectData().getProtocol(Simple.class));
}
@Test
public void testRecord() throws IOException {
- Schema schm = ReflectData.getSchema(SampleRecord.class);
- Class<?> c = SampleRecord.class;
- String prefix =
- ((c.getEnclosingClass() == null
- || "null".equals(c.getEnclosingClass())) ?
- c.getPackage().getName() + "."
- : (c.getEnclosingClass().getName() + "$"));
+ Schema schm = new ReflectData().getSchema(SampleRecord.class);
+ String prefix = getPrefix(SampleRecord.class);
ReflectDatumWriter writer = new ReflectDatumWriter(schm);
ByteArrayOutputStream out = new ByteArrayOutputStream();
SampleRecord record = new SampleRecord();
@@ -75,6 +78,35 @@
assertEquals(record, decoded);
}
+ @Test
+ public void testRecordWithNull() throws IOException {
+ ReflectData reflectData = ReflectData.newNullAllowingInstance();
+ Schema schm = reflectData.getSchema(AnotherSampleRecord.class);
+ String prefix = getPrefix(AnotherSampleRecord.class);
+ ReflectDatumWriter writer = new ReflectDatumWriter(schm);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // keep record.a null and see if that works
+ AnotherSampleRecord a = new AnotherSampleRecord();
+ writer.write(a, new BinaryEncoder(out));
+ AnotherSampleRecord b = new AnotherSampleRecord(10);
+ writer.write(b, new BinaryEncoder(out));
+ ReflectDatumReader reader = new ReflectDatumReader(schm, prefix);
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ Object decoded = reader.read(null, new BinaryDecoder(in));
+ assertEquals(a, decoded);
+ decoded = reader.read(null, new BinaryDecoder(in));
+ assertEquals(b, decoded);
+ }
+
+ private String getPrefix(Class<?> c) {
+ String prefix =
+ ((c.getEnclosingClass() == null
+ || "null".equals(c.getEnclosingClass())) ?
+ c.getPackage().getName() + "."
+ : (c.getEnclosingClass().getName() + "$"));
+ return prefix;
+ }
+
public static class SampleRecord {
public int x = 1;
private int y = 2;
@@ -97,5 +129,27 @@
return false;
return true;
}
+
+ public static class AnotherSampleRecord {
+ private Integer a = null;
+
+ public AnotherSampleRecord() {
+ }
+
+ AnotherSampleRecord(Integer a) {
+ this.a = a;
+ }
+
+ public int hashCode() {
+ return (a != null ? a.hashCode() : 0);
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof AnotherSampleRecord) {
+ return this.a == ((AnotherSampleRecord)other).a;
+ }
+ return false;
+ }
+ }
}
}