You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by to...@apache.org on 2015/07/14 16:07:24 UTC

svn commit: r1690939 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/avro/src/test/java/org/apache/avro/

Author: tomwhite
Date: Tue Jul 14 14:07:23 2015
New Revision: 1690939

URL: http://svn.apache.org/r1690939
Log:
AVRO-1692. Allow more than one logical type for a Java class. Contributed by Ryan Blue.

Added:
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jul 14 14:07:23 2015
@@ -98,6 +98,9 @@ Trunk (not yet released)
     AVRO-1697. Ruby: Add support for the Snappy codec to the Ruby library.
     (Daniel Schierbeck via tomwhite)
 
+    AVRO-1692. Allow more than one logical type for a Java class. (blue via
+    tomwhite)
+
   BUG FIXES
 
     AVRO-1553. Java: MapReduce never uses MapOutputValueSchema (tomwhite)

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Tue Jul 14 14:07:23 2015
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
 import java.util.WeakHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -94,39 +95,69 @@ public class GenericData {
   /** Return the class loader that's used (by subclasses). */
   public ClassLoader getClassLoader() { return classLoader; }
 
-  public Map<String, Conversion<?>> conversions =
+  private Map<String, Conversion<?>> conversions =
       new HashMap<String, Conversion<?>>();
 
-  public Map<Class<?>, Conversion<?>> conversionsByClass =
-      new IdentityHashMap<Class<?>, Conversion<?>>();
+  private Map<Class<?>, Map<String, Conversion<?>>> conversionsByClass =
+      new IdentityHashMap<Class<?>, Map<String, Conversion<?>>>();
 
+  /**
+   * Registers the given conversion to be used when reading and writing with
+   * this data model.
+   *
+   * @param conversion a logical type Conversion.
+   */
   public void addLogicalTypeConversion(Conversion<?> conversion) {
     conversions.put(conversion.getLogicalTypeName(), conversion);
-    conversionsByClass.put(conversion.getConvertedType(), conversion);
+    Class<?> type = conversion.getConvertedType();
+    if (conversionsByClass.containsKey(type)) {
+      conversionsByClass.get(type).put(
+          conversion.getLogicalTypeName(), conversion);
+    } else {
+      Map<String, Conversion<?>> conversions = new LinkedHashMap<String, Conversion<?>>();
+      conversions.put(conversion.getLogicalTypeName(), conversion);
+      conversionsByClass.put(type, conversions);
+    }
   }
 
+  /**
+   * Returns the first conversion found for the given class.
+   *
+   * @param datumClass a Class
+   * @return the first registered conversion for the class, or null
+   */
   @SuppressWarnings("unchecked")
-  public <T> Conversion<? super T> getConversionFrom(Class<T> datumClass,
-                                                     LogicalType logicalType) {
-    Conversion<?> conversion = conversionsByClass.get(datumClass);
-    if (conversion != null &&
-        conversion.getLogicalTypeName().equals(logicalType.getName())) {
-      return (Conversion<T>) conversion;
+  public <T> Conversion<T> getConversionByClass(Class<T> datumClass) {
+    Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
+    if (conversions != null) {
+      return (Conversion<T>) conversions.values().iterator().next();
     }
     return null;
   }
 
+  /**
+   * Returns the conversion for the given class and logical type.
+   *
+   * @param datumClass a Class
+   * @param logicalType a LogicalType
+   * @return the conversion for the class and logical type, or null
+   */
   @SuppressWarnings("unchecked")
-  public <T> Conversion<? extends T> getConversionTo(Class<T> datumClass,
-                                                     LogicalType logicalType) {
-    Conversion<?> conversion = conversionsByClass.get(datumClass);
-    if (conversion != null &&
-        conversion.getLogicalTypeName().equals(logicalType.getName())) {
-      return (Conversion<T>) conversion;
+  public <T> Conversion<T> getConversionByClass(Class<T> datumClass,
+                                                LogicalType logicalType) {
+    Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
+    if (conversions != null) {
+      return (Conversion<T>) conversions.get(logicalType.getName());
     }
     return null;
   }
 
+  /**
+   * Returns the Conversion for the given logical type.
+   *
+   * @param logicalType a logical type
+   * @return the conversion for the logical type, or null
+   */
   @SuppressWarnings("unchecked")
   public Conversion<Object> getConversionFor(LogicalType logicalType) {
     if (logicalType == null) {
@@ -657,15 +688,16 @@ public class GenericData {
     // this allows logical type concrete classes to overlap with supported ones
     // for example, a conversion could return a map
     if (datum != null) {
-      Conversion<?> conversion = conversionsByClass.get(datum.getClass());
-      if (conversion != null) {
-        String logicalTypeName = conversion.getLogicalTypeName();
+      Map<String, Conversion<?>> conversions = conversionsByClass.get(datum.getClass());
+      if (conversions != null) {
         List<Schema> candidates = union.getTypes();
         for (int i = 0; i < candidates.size(); i += 1) {
-          LogicalType candidateLogicalType = candidates.get(i).getLogicalType();
-          if (candidateLogicalType != null &&
-              logicalTypeName.equals(candidateLogicalType.getName())) {
-            return i;
+          LogicalType candidateType = candidates.get(i).getLogicalType();
+          if (candidateType != null) {
+            Conversion<?> conversion = conversions.get(candidateType.getName());
+            if (conversion != null) {
+              return i;
+            }
           }
         }
       }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java Tue Jul 14 14:07:23 2015
@@ -66,7 +66,7 @@ public class GenericDatumWriter<D> imple
     LogicalType logicalType = schema.getLogicalType();
     if (datum != null && logicalType != null) {
       Conversion<?> conversion = getData()
-          .getConversionFrom(datum.getClass(), logicalType);
+          .getConversionByClass(datum.getClass(), logicalType);
       writeWithoutConversion(schema,
           convert(schema, logicalType, conversion, datum), out);
     } else {

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Tue Jul 14 14:07:23 2015
@@ -561,10 +561,9 @@ public class ReflectData extends Specifi
         return Schema.create(Schema.Type.BYTES);
       if (Collection.class.isAssignableFrom(c))              // array
         throw new AvroRuntimeException("Can't find element type of Collection");
-      for (Conversion<?> conversion : conversions.values()) {  // logical type
-        if (conversion.getConvertedType().isAssignableFrom(c)) {
-          return conversion.getRecommendedSchema();
-        }
+      Conversion<?> conversion = getConversionByClass(c);
+      if (conversion != null) {
+        return conversion.getRecommendedSchema();
       }
       String fullName = c.getName();
       Schema schema = names.get(fullName);

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1690939&r1=1690938&r2=1690939&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java Tue Jul 14 14:07:23 2015
@@ -285,7 +285,7 @@ public class ReflectDatumReader<T> exten
         }
         LogicalType logicalType = f.schema().getLogicalType();
         if (logicalType != null) {
-          Conversion<?> conversion = getData().getConversionTo(
+          Conversion<?> conversion = getData().getConversionByClass(
               accessor.getField().getType(), logicalType);
           if (conversion != null) {
             try {

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java?rev=1690939&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java Tue Jul 14 14:07:23 2015
@@ -0,0 +1,387 @@
+package org.apache.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCircularReferences {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static class Reference extends LogicalType {
+    private static final String REFERENCE = "reference";
+    private static final String REF_FIELD_NAME = "ref-field-name";
+
+    private final String refFieldName;
+
+    public Reference(String refFieldName) {
+      super(REFERENCE);
+      this.refFieldName = refFieldName;
+    }
+
+    public Reference(Schema schema) {
+      super(REFERENCE);
+      this.refFieldName = schema.getProp(REF_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(REF_FIELD_NAME, refFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCE;
+    }
+
+    public String getRefFieldName() {
+      return refFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getField(refFieldName) == null) {
+        throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+      }
+    }
+  }
+
+  public static class Referenceable extends LogicalType {
+    private static final String REFERENCEABLE = "referenceable";
+    private static final String ID_FIELD_NAME = "id-field-name";
+
+    private final String idFieldName;
+
+    public Referenceable(String idFieldName) {
+      super(REFERENCEABLE);
+      this.idFieldName = idFieldName;
+    }
+
+    public Referenceable(Schema schema) {
+      super(REFERENCEABLE);
+      this.idFieldName = schema.getProp(ID_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(ID_FIELD_NAME, idFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCEABLE;
+    }
+
+    public String getIdFieldName() {
+      return idFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      Schema.Field idField = schema.getField(idFieldName);
+      if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void addReferenceTypes() {
+    LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Referenceable(schema);
+      }
+    });
+    LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Reference(schema);
+      }
+    });
+  }
+
+  public static class ReferenceManager {
+    private interface Callback {
+      void set(Object referenceable);
+    }
+
+    private final Map<Long, Object> references = new HashMap<Long, Object>();
+    private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+    private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+    private final ReferenceableTracker tracker = new ReferenceableTracker();
+    private final ReferenceHandler handler = new ReferenceHandler();
+
+    public ReferenceableTracker getTracker() {
+      return tracker;
+    }
+
+    public ReferenceHandler getHandler() {
+      return handler;
+    }
+
+    public class ReferenceableTracker extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Referenceable.REFERENCEABLE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // read side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        references.put(id, value);
+
+        // call any callbacks waiting to resolve this id
+        List<Callback> callbacks = callbacksById.get(id);
+        for (Callback callback : callbacks) {
+          callback.set(value);
+        }
+
+        return value;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // write side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        //references.put(id, value);
+        ids.put(value, id);
+
+        return value;
+      }
+
+      private long getId(IndexedRecord referenceable, Schema schema) {
+        Referenceable info = (Referenceable) schema.getLogicalType();
+        int idField = schema.getField(info.getIdFieldName()).pos();
+        return (Long) referenceable.get(idField);
+      }
+    }
+
+    public class ReferenceHandler extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Reference.REFERENCE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+        // read side: resolve the record or save a callback
+        final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+        Long id = (Long) record.get(refField.pos());
+        if (id != null) {
+          if (references.containsKey(id)) {
+            record.put(refField.pos(), references.get(id));
+
+          } else {
+            List<Callback> callbacks = callbacksById.get(id);
+            if (callbacks == null) {
+              callbacks = new ArrayList<Callback>();
+              callbacksById.put(id, callbacks);
+            }
+            // add a callback to resolve this reference when the id is available
+            callbacks.add(new Callback() {
+              @Override
+              public void set(Object referenceable) {
+                record.put(refField.pos(), referenceable);
+              }
+            });
+          }
+        }
+
+        return record;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+        // write side: replace a referenced field with its id
+        Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+        IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+        if (referenced == null) {
+          return record;
+        }
+
+        // hijack the field to return the id instead of the ref
+        return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+      }
+    }
+
+    private static class HijackingIndexedRecord implements IndexedRecord {
+      private final IndexedRecord wrapped;
+      private final int index;
+      private final Object data;
+
+      public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+        this.wrapped = wrapped;
+        this.index = index;
+        this.data = data;
+      }
+
+      @Override
+      public void put(int i, Object v) {
+        throw new RuntimeException("[BUG] This is a read-only class.");
+      }
+
+      @Override
+      public Object get(int i) {
+        if (i == index) {
+          return data;
+        }
+        return wrapped.get(i);
+      }
+
+      @Override
+      public Schema getSchema() {
+        return wrapped.getSchema();
+      }
+    }
+  }
+
+  @Test
+  public void test() throws IOException {
+    ReferenceManager manager = new ReferenceManager();
+    GenericData model = new GenericData();
+    model.addLogicalTypeConversion(manager.getTracker());
+    model.addLogicalTypeConversion(manager.getHandler());
+
+    Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+    Schema parentRefSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL),
+        Schema.create(Schema.Type.LONG),
+        parentSchema);
+    Reference parentRef = new Reference("parent");
+
+    List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+    childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+    childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+    Schema childSchema = parentRef.addToSchema(
+        Schema.createRecord("Child", null, null, false, childFields));
+
+    List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+    parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+    parentFields.add(new Schema.Field("child", childSchema, null, null));
+    parentSchema.setFields(parentFields);
+    Referenceable idRef = new Referenceable("id");
+
+    Schema schema = idRef.addToSchema(parentSchema);
+
+    System.out.println("Schema: " + schema.toString(true));
+
+    Record parent = new Record(schema);
+    parent.put("id", 1L);
+    parent.put("p", "parent data!");
+
+    Record child = new Record(childSchema);
+    child.put("c", "child data!");
+    child.put("parent", parent);
+
+    parent.put("child", child);
+
+    // serialization round trip
+    File data = write(model, schema, parent);
+    List<Record> records = read(model, schema, data);
+
+    Record actual = records.get(0);
+
+    // because the record is a recursive structure, equals won't work
+    Assert.assertEquals("Should correctly read back the parent id",
+        1L, actual.get("id"));
+    Assert.assertEquals("Should correctly read back the parent data",
+        new Utf8("parent data!"), actual.get("p"));
+
+    Record actualChild = (Record) actual.get("child");
+    Assert.assertEquals("Should correctly read back the child data",
+        new Utf8("child data!"), actualChild.get("c"));
+    Object childParent = actualChild.get("parent");
+    Assert.assertTrue("Should have a parent Record object",
+        childParent instanceof Record);
+
+    Record childParentRecord = (Record) actualChild.get("parent");
+    Assert.assertEquals("Should have the right parent id",
+        1L, childParentRecord.get("id"));
+    Assert.assertEquals("Should have the right parent data",
+        new Utf8("parent data!"), childParentRecord.get("p"));
+  }
+
+  private <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+    DatumReader<D> reader = newReader(model, schema);
+    List<D> data = new ArrayList<D>();
+    FileReader<D> fileReader = null;
+
+    try {
+      fileReader = new DataFileReader<D>(file, reader);
+      for (D datum : fileReader) {
+        data.add(datum);
+      }
+    } finally {
+      if (fileReader != null) {
+        fileReader.close();
+      }
+    }
+
+    return data;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> DatumReader<D> newReader(GenericData model, Schema schema) {
+    return model.createDatumReader(schema);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    File file = temp.newFile();
+    DatumWriter<D> writer = model.createDatumWriter(schema);
+    DataFileWriter<D> fileWriter = new DataFileWriter<D>(writer);
+
+    try {
+      fileWriter.create(schema, file);
+      for (D datum : data) {
+        fileWriter.append(datum);
+      }
+    } finally {
+      fileWriter.close();
+    }
+
+    return file;
+  }
+}