You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by to...@apache.org on 2015/07/14 16:07:24 UTC
svn commit: r1690939 - in /avro/trunk: ./
lang/java/avro/src/main/java/org/apache/avro/generic/
lang/java/avro/src/main/java/org/apache/avro/reflect/
lang/java/avro/src/test/java/org/apache/avro/
Author: tomwhite
Date: Tue Jul 14 14:07:23 2015
New Revision: 1690939
URL: http://svn.apache.org/r1690939
Log:
AVRO-1692. Allow more than one logical type for a Java class. Contributed by Ryan Blue.
Added:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jul 14 14:07:23 2015
@@ -98,6 +98,9 @@ Trunk (not yet released)
AVRO-1697. Ruby: Add support for the Snappy codec to the Ruby library.
(Daniel Schierbeck via tomwhite)
+ AVRO-1692. Allow more than one logical type for a Java class. (blue via
+ tomwhite)
+
BUG FIXES
AVRO-1553. Java: MapReduce never uses MapOutputValueSchema (tomwhite)
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Tue Jul 14 14:07:23 2015
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
import java.util.WeakHashMap;
import java.util.Iterator;
import java.util.List;
@@ -94,39 +95,69 @@ public class GenericData {
/** Return the class loader that's used (by subclasses). */
public ClassLoader getClassLoader() { return classLoader; }
- public Map<String, Conversion<?>> conversions =
+ private Map<String, Conversion<?>> conversions =
new HashMap<String, Conversion<?>>();
- public Map<Class<?>, Conversion<?>> conversionsByClass =
- new IdentityHashMap<Class<?>, Conversion<?>>();
+ private Map<Class<?>, Map<String, Conversion<?>>> conversionsByClass =
+ new IdentityHashMap<Class<?>, Map<String, Conversion<?>>>();
+ /**
+ * Registers the given conversion to be used when reading and writing with
+ * this data model.
+ *
+ * @param conversion a logical type Conversion.
+ */
public void addLogicalTypeConversion(Conversion<?> conversion) {
conversions.put(conversion.getLogicalTypeName(), conversion);
- conversionsByClass.put(conversion.getConvertedType(), conversion);
+ Class<?> type = conversion.getConvertedType();
+ if (conversionsByClass.containsKey(type)) {
+ conversionsByClass.get(type).put(
+ conversion.getLogicalTypeName(), conversion);
+ } else {
+ Map<String, Conversion<?>> conversions = new LinkedHashMap<String, Conversion<?>>();
+ conversions.put(conversion.getLogicalTypeName(), conversion);
+ conversionsByClass.put(type, conversions);
+ }
}
+ /**
+ * Returns the first conversion found for the given class.
+ *
+ * @param datumClass a Class
+ * @return the first registered conversion for the class, or null
+ */
@SuppressWarnings("unchecked")
- public <T> Conversion<? super T> getConversionFrom(Class<T> datumClass,
- LogicalType logicalType) {
- Conversion<?> conversion = conversionsByClass.get(datumClass);
- if (conversion != null &&
- conversion.getLogicalTypeName().equals(logicalType.getName())) {
- return (Conversion<T>) conversion;
+ public <T> Conversion<T> getConversionByClass(Class<T> datumClass) {
+ Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
+ if (conversions != null) {
+ return (Conversion<T>) conversions.values().iterator().next();
}
return null;
}
+ /**
+ * Returns the conversion for the given class and logical type.
+ *
+ * @param datumClass a Class
+ * @param logicalType a LogicalType
+ * @return the conversion for the class and logical type, or null
+ */
@SuppressWarnings("unchecked")
- public <T> Conversion<? extends T> getConversionTo(Class<T> datumClass,
- LogicalType logicalType) {
- Conversion<?> conversion = conversionsByClass.get(datumClass);
- if (conversion != null &&
- conversion.getLogicalTypeName().equals(logicalType.getName())) {
- return (Conversion<T>) conversion;
+ public <T> Conversion<T> getConversionByClass(Class<T> datumClass,
+ LogicalType logicalType) {
+ Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
+ if (conversions != null) {
+ return (Conversion<T>) conversions.get(logicalType.getName());
}
return null;
}
+ /**
+ * Returns the Conversion for the given logical type.
+ *
+ * @param logicalType a logical type
+ * @return the conversion for the logical type, or null
+ */
@SuppressWarnings("unchecked")
public Conversion<Object> getConversionFor(LogicalType logicalType) {
if (logicalType == null) {
@@ -657,15 +688,16 @@ public class GenericData {
// this allows logical type concrete classes to overlap with supported ones
// for example, a conversion could return a map
if (datum != null) {
- Conversion<?> conversion = conversionsByClass.get(datum.getClass());
- if (conversion != null) {
- String logicalTypeName = conversion.getLogicalTypeName();
+ Map<String, Conversion<?>> conversions = conversionsByClass.get(datum.getClass());
+ if (conversions != null) {
List<Schema> candidates = union.getTypes();
for (int i = 0; i < candidates.size(); i += 1) {
- LogicalType candidateLogicalType = candidates.get(i).getLogicalType();
- if (candidateLogicalType != null &&
- logicalTypeName.equals(candidateLogicalType.getName())) {
- return i;
+ LogicalType candidateType = candidates.get(i).getLogicalType();
+ if (candidateType != null) {
+ Conversion<?> conversion = conversions.get(candidateType.getName());
+ if (conversion != null) {
+ return i;
+ }
}
}
}
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java Tue Jul 14 14:07:23 2015
@@ -66,7 +66,7 @@ public class GenericDatumWriter<D> imple
LogicalType logicalType = schema.getLogicalType();
if (datum != null && logicalType != null) {
Conversion<?> conversion = getData()
- .getConversionFrom(datum.getClass(), logicalType);
+ .getConversionByClass(datum.getClass(), logicalType);
writeWithoutConversion(schema,
convert(schema, logicalType, conversion, datum), out);
} else {
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Tue Jul 14 14:07:23 2015
@@ -561,10 +561,9 @@ public class ReflectData extends Specifi
return Schema.create(Schema.Type.BYTES);
if (Collection.class.isAssignableFrom(c)) // array
throw new AvroRuntimeException("Can't find element type of Collection");
- for (Conversion<?> conversion : conversions.values()) { // logical type
- if (conversion.getConvertedType().isAssignableFrom(c)) {
- return conversion.getRecommendedSchema();
- }
+ Conversion<?> conversion = getConversionByClass(c);
+ if (conversion != null) {
+ return conversion.getRecommendedSchema();
}
String fullName = c.getName();
Schema schema = names.get(fullName);
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java Tue Jul 14 14:07:23 2015
@@ -285,7 +285,7 @@ public class ReflectDatumReader<T> exten
}
LogicalType logicalType = f.schema().getLogicalType();
if (logicalType != null) {
- Conversion<?> conversion = getData().getConversionTo(
+ Conversion<?> conversion = getData().getConversionByClass(
accessor.getField().getType(), logicalType);
if (conversion != null) {
try {
Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java?rev=1690939&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java Tue Jul 14 14:07:23 2015
@@ -0,0 +1,387 @@
+package org.apache.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCircularReferences {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public static class Reference extends LogicalType {
+ private static final String REFERENCE = "reference";
+ private static final String REF_FIELD_NAME = "ref-field-name";
+
+ private final String refFieldName;
+
+ public Reference(String refFieldName) {
+ super(REFERENCE);
+ this.refFieldName = refFieldName;
+ }
+
+ public Reference(Schema schema) {
+ super(REFERENCE);
+ this.refFieldName = schema.getProp(REF_FIELD_NAME);
+ }
+
+ @Override
+ public Schema addToSchema(Schema schema) {
+ super.addToSchema(schema);
+ schema.addProp(REF_FIELD_NAME, refFieldName);
+ return schema;
+ }
+
+ @Override
+ public String getName() {
+ return REFERENCE;
+ }
+
+ public String getRefFieldName() {
+ return refFieldName;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getField(refFieldName) == null) {
+ throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+ }
+ }
+ }
+
+ public static class Referenceable extends LogicalType {
+ private static final String REFERENCEABLE = "referenceable";
+ private static final String ID_FIELD_NAME = "id-field-name";
+
+ private final String idFieldName;
+
+ public Referenceable(String idFieldName) {
+ super(REFERENCEABLE);
+ this.idFieldName = idFieldName;
+ }
+
+ public Referenceable(Schema schema) {
+ super(REFERENCEABLE);
+ this.idFieldName = schema.getProp(ID_FIELD_NAME);
+ }
+
+ @Override
+ public Schema addToSchema(Schema schema) {
+ super.addToSchema(schema);
+ schema.addProp(ID_FIELD_NAME, idFieldName);
+ return schema;
+ }
+
+ @Override
+ public String getName() {
+ return REFERENCEABLE;
+ }
+
+ public String getIdFieldName() {
+ return idFieldName;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ Schema.Field idField = schema.getField(idFieldName);
+ if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+ throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void addReferenceTypes() {
+ LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return new Referenceable(schema);
+ }
+ });
+ LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return new Reference(schema);
+ }
+ });
+ }
+
+ public static class ReferenceManager {
+ private interface Callback {
+ void set(Object referenceable);
+ }
+
+ private final Map<Long, Object> references = new HashMap<Long, Object>();
+ private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+ private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+ private final ReferenceableTracker tracker = new ReferenceableTracker();
+ private final ReferenceHandler handler = new ReferenceHandler();
+
+ public ReferenceableTracker getTracker() {
+ return tracker;
+ }
+
+ public ReferenceHandler getHandler() {
+ return handler;
+ }
+
+ public class ReferenceableTracker extends Conversion<IndexedRecord> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<IndexedRecord> getConvertedType() {
+ return (Class) Record.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return Referenceable.REFERENCEABLE;
+ }
+
+ @Override
+ public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ // read side
+ long id = getId(value, schema);
+
+ // keep track of this for later references
+ references.put(id, value);
+
+ // call any callbacks waiting to resolve this id
+ List<Callback> callbacks = callbacksById.get(id);
+ for (Callback callback : callbacks) {
+ callback.set(value);
+ }
+
+ return value;
+ }
+
+ @Override
+ public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ // write side
+ long id = getId(value, schema);
+
+ // keep track of this for later references
+ //references.put(id, value);
+ ids.put(value, id);
+
+ return value;
+ }
+
+ private long getId(IndexedRecord referenceable, Schema schema) {
+ Referenceable info = (Referenceable) schema.getLogicalType();
+ int idField = schema.getField(info.getIdFieldName()).pos();
+ return (Long) referenceable.get(idField);
+ }
+ }
+
+ public class ReferenceHandler extends Conversion<IndexedRecord> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<IndexedRecord> getConvertedType() {
+ return (Class) Record.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return Reference.REFERENCE;
+ }
+
+ @Override
+ public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+ // read side: resolve the record or save a callback
+ final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+ Long id = (Long) record.get(refField.pos());
+ if (id != null) {
+ if (references.containsKey(id)) {
+ record.put(refField.pos(), references.get(id));
+
+ } else {
+ List<Callback> callbacks = callbacksById.get(id);
+ if (callbacks == null) {
+ callbacks = new ArrayList<Callback>();
+ callbacksById.put(id, callbacks);
+ }
+ // add a callback to resolve this reference when the id is available
+ callbacks.add(new Callback() {
+ @Override
+ public void set(Object referenceable) {
+ record.put(refField.pos(), referenceable);
+ }
+ });
+ }
+ }
+
+ return record;
+ }
+
+ @Override
+ public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+ // write side: replace a referenced field with its id
+ Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+ IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+ if (referenced == null) {
+ return record;
+ }
+
+ // hijack the field to return the id instead of the ref
+ return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+ }
+ }
+
+ private static class HijackingIndexedRecord implements IndexedRecord {
+ private final IndexedRecord wrapped;
+ private final int index;
+ private final Object data;
+
+ public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+ this.wrapped = wrapped;
+ this.index = index;
+ this.data = data;
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new RuntimeException("[BUG] This is a read-only class.");
+ }
+
+ @Override
+ public Object get(int i) {
+ if (i == index) {
+ return data;
+ }
+ return wrapped.get(i);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return wrapped.getSchema();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws IOException {
+ ReferenceManager manager = new ReferenceManager();
+ GenericData model = new GenericData();
+ model.addLogicalTypeConversion(manager.getTracker());
+ model.addLogicalTypeConversion(manager.getHandler());
+
+ Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+ Schema parentRefSchema = Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ Schema.create(Schema.Type.LONG),
+ parentSchema);
+ Reference parentRef = new Reference("parent");
+
+ List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+ childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+ childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+ Schema childSchema = parentRef.addToSchema(
+ Schema.createRecord("Child", null, null, false, childFields));
+
+ List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+ parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+ parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+ parentFields.add(new Schema.Field("child", childSchema, null, null));
+ parentSchema.setFields(parentFields);
+ Referenceable idRef = new Referenceable("id");
+
+ Schema schema = idRef.addToSchema(parentSchema);
+
+ System.out.println("Schema: " + schema.toString(true));
+
+ Record parent = new Record(schema);
+ parent.put("id", 1L);
+ parent.put("p", "parent data!");
+
+ Record child = new Record(childSchema);
+ child.put("c", "child data!");
+ child.put("parent", parent);
+
+ parent.put("child", child);
+
+ // serialization round trip
+ File data = write(model, schema, parent);
+ List<Record> records = read(model, schema, data);
+
+ Record actual = records.get(0);
+
+ // because the record is a recursive structure, equals won't work
+ Assert.assertEquals("Should correctly read back the parent id",
+ 1L, actual.get("id"));
+ Assert.assertEquals("Should correctly read back the parent data",
+ new Utf8("parent data!"), actual.get("p"));
+
+ Record actualChild = (Record) actual.get("child");
+ Assert.assertEquals("Should correctly read back the child data",
+ new Utf8("child data!"), actualChild.get("c"));
+ Object childParent = actualChild.get("parent");
+ Assert.assertTrue("Should have a parent Record object",
+ childParent instanceof Record);
+
+ Record childParentRecord = (Record) actualChild.get("parent");
+ Assert.assertEquals("Should have the right parent id",
+ 1L, childParentRecord.get("id"));
+ Assert.assertEquals("Should have the right parent data",
+ new Utf8("parent data!"), childParentRecord.get("p"));
+ }
+
+ private <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+ DatumReader<D> reader = newReader(model, schema);
+ List<D> data = new ArrayList<D>();
+ FileReader<D> fileReader = null;
+
+ try {
+ fileReader = new DataFileReader<D>(file, reader);
+ for (D datum : fileReader) {
+ data.add(datum);
+ }
+ } finally {
+ if (fileReader != null) {
+ fileReader.close();
+ }
+ }
+
+ return data;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <D> DatumReader<D> newReader(GenericData model, Schema schema) {
+ return model.createDatumReader(schema);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+ File file = temp.newFile();
+ DatumWriter<D> writer = model.createDatumWriter(schema);
+ DataFileWriter<D> fileWriter = new DataFileWriter<D>(writer);
+
+ try {
+ fileWriter.create(schema, file);
+ for (D datum : data) {
+ fileWriter.append(datum);
+ }
+ } finally {
+ fileWriter.close();
+ }
+
+ return file;
+ }
+}