You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2018/12/11 19:07:12 UTC

[avro] branch master updated: Improved conversions handling + pluggable conversions support [AVRO-1891, AVRO-2065] (#329)

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ed38d7  Improved conversions handling + pluggable conversions support [AVRO-1891, AVRO-2065] (#329)
7ed38d7 is described below

commit 7ed38d7c7e150987ef8bf035196576fc158e03eb
Author: Katrin Skoglund <ka...@avanza.se>
AuthorDate: Tue Dec 11 20:07:08 2018 +0100

    Improved conversions handling + pluggable conversions support [AVRO-1891, AVRO-2065] (#329)
    
    * Added end-to-end test that reproduces union with logical types problem
    
    * Adding required conversions to SpecificData in generated class
    (same as in SpecificCompiler)
    
    * Added test with BigDecimal
    
    * Added test with BigDecimal
    
    * Introduced customizable conversions in compiler and Maven plugin.
    
    * Fixed bug
    
    * Fixed Maven plugin classpath
    
    * Get the correct SpecificData whenever possible, to get the right conversions
    
    * No need to expose the map of conversions so expose only the values.
    
    * Better tests
    
    * Default values and conversions
    
    * Cleanup of some changes in Maven plugin
    
    * Fixed equals() for classes with nested logical types. Improved tests
    
    * Added missing copyright statement
    
    * Fixed compile error after rebase
    
    * Fixed problem with logical types in nested records.
    
    * Fixed failing test.
    
    * Fixed serialization problem when creating SpecificDatumReader from a class
---
 .../org/apache/avro/data/RecordBuilderBase.java    |  32 +----
 .../java/org/apache/avro/generic/GenericData.java  |   4 +
 .../org/apache/avro/specific/SpecificData.java     |  50 ++++++++
 .../apache/avro/specific/SpecificDatumReader.java  |   6 +-
 .../apache/avro/specific/SpecificDatumWriter.java  |   4 +-
 .../apache/avro/specific/SpecificRecordBase.java   |  13 +-
 .../avro/specific/SpecificRecordBuilderBase.java   |   6 +-
 .../specific/TestRecordWithJsr310LogicalTypes.java |  20 ++--
 .../avro/compiler/specific/SpecificCompiler.java   |  81 +++++++++++--
 .../specific/templates/java/classic/record.vm      |  28 ++---
 .../compiler/specific/TestSpecificCompiler.java    | 132 +++++++++++++++++++--
 lang/java/integration-test/codegen-test/pom.xml    |  91 ++++++++++++++
 .../codegentest/AbstractSpecificRecordTest.java    |  73 ++++++++++++
 .../avro/codegentest/TestCustomConversion.java     |  45 +++++++
 .../codegentest/TestLogicalTypesWithDefaults.java  |  58 +++++++++
 .../avro/codegentest/TestNestedLogicalTypes.java   |  67 +++++++++++
 .../avro/codegentest/TestNullableLogicalTypes.java |  45 +++++++
 .../src/test/resources/avro/custom_conversion.avsc |  12 ++
 .../avro/logical_types_with_default_values.avsc    |  12 ++
 .../resources/avro/nested_logical_types_array.avsc |  26 ++++
 .../resources/avro/nested_logical_types_map.avsc   |  26 ++++
 .../avro/nested_logical_types_record.avsc          |  23 ++++
 .../resources/avro/nested_logical_types_union.avsc |  23 ++++
 .../resources/avro/nullable_logical_types.avsc     |  11 ++
 .../avro/nullable_logical_types_array.avsc         |  16 +++
 lang/java/integration-test/pom.xml                 |  99 ++++++++++++++++
 .../test-custom-conversions/pom.xml                |  45 +++++++
 .../org.apache.avro.codegentest/CustomDecimal.java |  65 ++++++++++
 .../CustomDecimalConversion.java                   |  52 ++++++++
 lang/java/mapred/pom.xml                           |  12 ++
 .../org/apache/avro/mojo/AbstractAvroMojo.java     |  32 +++++
 .../java/org/apache/avro/mojo/IDLProtocolMojo.java |   6 +-
 .../java/org/apache/avro/mojo/ProtocolMojo.java    |  14 +++
 .../main/java/org/apache/avro/mojo/SchemaMojo.java |  13 ++
 lang/java/pom.xml                                  |   1 +
 .../avro/examples/baseball/Player.java             |   1 +
 .../tools/src/test/compiler/output/Player.java     |   1 +
 37 files changed, 1153 insertions(+), 92 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java b/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
index 106c500..6d2f4c1 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
@@ -17,19 +17,16 @@
  */
 package org.apache.avro.data;
 
-import java.io.IOException;
-import java.util.Arrays;
-
 import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Conversion;
-import org.apache.avro.Conversions;
-import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
+import java.util.Arrays;
+
 /** Abstract base class for RecordBuilder implementations.  Not thread-safe. */
 public abstract class RecordBuilderBase<T extends IndexedRecord>
   implements RecordBuilder<T> {
@@ -138,29 +135,6 @@ public abstract class RecordBuilderBase<T extends IndexedRecord>
     return data.deepCopy(field.schema(), data.getDefaultValue(field));
   }
 
-  /**
-   * Gets the default value of the given field, if any. Pass in a conversion
-   * to convert data to logical type class. Please make sure the schema does
-   * have a logical type, otherwise an exception would be thrown out.
-   * @param field the field whose default value should be retrieved.
-   * @param conversion the tool to convert data to logical type class
-   * @return the default value associated with the given field,
-   * or null if none is specified in the schema.
-   * @throws IOException
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected Object defaultValue(Field field, Conversion<?> conversion) throws IOException {
-    Schema schema = field.schema();
-    LogicalType logicalType = schema.getLogicalType();
-    Object rawDefaultValue = data.deepCopy(schema, data.getDefaultValue(field));
-    if (conversion == null || logicalType == null) {
-      return rawDefaultValue;
-    } else {
-      return Conversions.convertToLogicalType(rawDefaultValue, schema,
-          logicalType, conversion);
-    }
-  }
-
   @Override
   public int hashCode() {
     final int prime = 31;
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
index 6dffa15..7294192 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
@@ -105,6 +105,10 @@ public class GenericData {
   private Map<Class<?>, Map<String, Conversion<?>>> conversionsByClass =
       new IdentityHashMap<>();
 
+  public Collection<Conversion<?>> getConversions() {
+    return conversions.values();
+  }
+
   /**
    * Registers the given conversion to be used when reading and writing with
    * this data model.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
index 21c8d8c..7f53f97 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
@@ -17,6 +17,7 @@
  */
 package org.apache.avro.specific;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
@@ -132,6 +133,55 @@ public class SpecificData extends GenericData {
   /** Return the singleton instance. */
   public static SpecificData get() { return INSTANCE; }
 
+  /**
+   * For RECORD type schemas, this method returns the SpecificData instance of the class associated with the schema,
+   * in order to get the right conversions for any logical types used.
+   *
+   * @param reader the reader schema
+   * @return the SpecificData associated with the schema's class, or the default instance.
+   */
+  public static SpecificData getForSchema(Schema reader) {
+    if (reader.getType() == Type.RECORD) {
+      final String className = getClassName(reader);
+      if (className != null) {
+        final Class<?> clazz;
+        try {
+          clazz = Class.forName(className);
+          return getForClass(clazz);
+        } catch (ClassNotFoundException e) {
+          return SpecificData.get();
+        }
+      }
+    }
+    return SpecificData.get();
+  }
+
+  /**
+   * If the given class is assignable to {@link SpecificRecordBase}, this method returns the SpecificData instance
+   * from the field {@code MODEL$}, in order to get the correct {@link org.apache.avro.Conversion} instances for the class.
+   * Falls back to the default instance {@link SpecificData#get()} for other classes or if the field is not found.
+   *
+   * @param c A class
+   * @param <T> .
+   * @return The SpecificData from the SpecificRecordBase instance, or the default SpecificData instance.
+   */
+  public static <T> SpecificData getForClass(Class<T> c) {
+    if (SpecificRecordBase.class.isAssignableFrom(c)) {
+      final Field specificDataField;
+      try {
+        specificDataField = c.getDeclaredField("MODEL$");
+        specificDataField.setAccessible(true);
+        return (SpecificData) specificDataField.get(null);
+      } catch (NoSuchFieldException e) {
+        // Return default instance
+        return SpecificData.get();
+      } catch (IllegalAccessException e) {
+        throw new AvroRuntimeException(e);
+      }
+    }
+    return SpecificData.get();
+  }
+
   private boolean useCustomCoderFlag
     = Boolean.parseBoolean(System.getProperty("org.apache.avro.specific.use_custom_coders","false"));
 
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumReader.java
index ccf8107..7fc91df 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumReader.java
@@ -33,18 +33,18 @@ public class SpecificDatumReader<T> extends GenericDatumReader<T> {
 
   /** Construct for reading instances of a class. */
   public SpecificDatumReader(Class<T> c) {
-    this(new SpecificData(c.getClassLoader()));
+    this(SpecificData.getForClass(c));
     setSchema(getSpecificData().getSchema(c));
   }
 
   /** Construct where the writer's and reader's schemas are the same. */
   public SpecificDatumReader(Schema schema) {
-    this(schema, schema, SpecificData.get());
+    this(schema, schema, SpecificData.getForSchema(schema));
   }
 
   /** Construct given writer's and reader's schema. */
   public SpecificDatumReader(Schema writer, Schema reader) {
-    this(writer, reader, SpecificData.get());
+    this(writer, reader, SpecificData.getForSchema(reader));
   }
 
   /** Construct given writer's schema, reader's schema, and a {@link
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumWriter.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumWriter.java
index 3d5e7ff..e4662be 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumWriter.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificDatumWriter.java
@@ -32,11 +32,11 @@ public class SpecificDatumWriter<T> extends GenericDatumWriter<T> {
   }
 
   public SpecificDatumWriter(Class<T> c) {
-    super(SpecificData.get().getSchema(c), SpecificData.get());
+    super(SpecificData.get().getSchema(c), SpecificData.getForClass(c));
   }
 
   public SpecificDatumWriter(Schema schema) {
-    super(schema, SpecificData.get());
+    super(schema, SpecificData.getForSchema(schema));
   }
 
   public SpecificDatumWriter(Schema root, SpecificData specificData) {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBase.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBase.java
index eed41b5..ac003ba 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBase.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBase.java
@@ -37,6 +37,11 @@ public abstract class SpecificRecordBase
   public abstract Object get(int field);
   public abstract void put(int field, Object value);
 
+  public SpecificData getSpecificData() {
+    // Default implementation for backwards compatibility, overridden in generated code
+    return SpecificData.get();
+  }
+
   public Conversion<?> getConversion(int field) {
     // for backward-compatibility. no older specific classes have conversions.
     return null;
@@ -61,22 +66,22 @@ public abstract class SpecificRecordBase
     if (that == this) return true;                        // identical object
     if (!(that instanceof SpecificRecord)) return false;  // not a record
     if (this.getClass() != that.getClass()) return false; // not same schema
-    return SpecificData.get().compare(this, that, this.getSchema(), true) == 0;
+    return getSpecificData().compare(this, that, this.getSchema(), true) == 0;
   }
 
   @Override
   public int hashCode() {
-    return SpecificData.get().hashCode(this, this.getSchema());
+    return getSpecificData().hashCode(this, this.getSchema());
   }
 
   @Override
   public int compareTo(SpecificRecord that) {
-    return SpecificData.get().compare(this, that, this.getSchema());
+    return getSpecificData().compare(this, that, this.getSchema());
   }
 
   @Override
   public String toString() {
-    return SpecificData.get().toString(this);
+    return getSpecificData().toString(this);
   }
 
   @Override
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBuilderBase.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBuilderBase.java
index ecf3c34..a8d220b 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBuilderBase.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificRecordBuilderBase.java
@@ -32,7 +32,7 @@ abstract public class SpecificRecordBuilderBase<T extends SpecificRecord>
    * @param schema the schema associated with the record class.
    */
   protected SpecificRecordBuilderBase(Schema schema) {
-    super(schema, SpecificData.get());
+    super(schema, SpecificData.getForSchema(schema));
   }
 
   /**
@@ -40,7 +40,7 @@ abstract public class SpecificRecordBuilderBase<T extends SpecificRecord>
    * @param other SpecificRecordBuilderBase instance to copy.
    */
   protected SpecificRecordBuilderBase(SpecificRecordBuilderBase<T> other) {
-    super(other, SpecificData.get());
+    super(other, other.data());
   }
 
   /**
@@ -48,6 +48,6 @@ abstract public class SpecificRecordBuilderBase<T extends SpecificRecord>
    * @param other the record instance to copy.
    */
   protected SpecificRecordBuilderBase(T other) {
-    super(other.getSchema(), SpecificData.get());
+    super(other.getSchema(), SpecificData.getForSchema(other.getSchema()));
   }
 }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithJsr310LogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithJsr310LogicalTypes.java
index 56e31f4..352e5f0 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithJsr310LogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithJsr310LogicalTypes.java
@@ -854,16 +854,16 @@ public class TestRecordWithJsr310LogicalTypes extends org.apache.avro.specific.S
     public TestRecordWithJsr310LogicalTypes build() {
       try {
         TestRecordWithJsr310LogicalTypes record = new TestRecordWithJsr310LogicalTypes();
-        record.b = fieldSetFlags()[0] ? this.b : (java.lang.Boolean) defaultValue(fields()[0], record.getConversion(0));
-        record.i32 = fieldSetFlags()[1] ? this.i32 : (java.lang.Integer) defaultValue(fields()[1], record.getConversion(1));
-        record.i64 = fieldSetFlags()[2] ? this.i64 : (java.lang.Long) defaultValue(fields()[2], record.getConversion(2));
-        record.f32 = fieldSetFlags()[3] ? this.f32 : (java.lang.Float) defaultValue(fields()[3], record.getConversion(3));
-        record.f64 = fieldSetFlags()[4] ? this.f64 : (java.lang.Double) defaultValue(fields()[4], record.getConversion(4));
-        record.s = fieldSetFlags()[5] ? this.s : (java.lang.CharSequence) defaultValue(fields()[5], record.getConversion(5));
-        record.d = fieldSetFlags()[6] ? this.d : (java.time.LocalDate) defaultValue(fields()[6], record.getConversion(6));
-        record.t = fieldSetFlags()[7] ? this.t : (java.time.LocalTime) defaultValue(fields()[7], record.getConversion(7));
-        record.ts = fieldSetFlags()[8] ? this.ts : (java.time.Instant) defaultValue(fields()[8], record.getConversion(8));
-        record.dec = fieldSetFlags()[9] ? this.dec : (java.math.BigDecimal) defaultValue(fields()[9], record.getConversion(9));
+        record.b = fieldSetFlags()[0] ? this.b : (java.lang.Boolean) defaultValue(fields()[0]);
+        record.i32 = fieldSetFlags()[1] ? this.i32 : (java.lang.Integer) defaultValue(fields()[1]);
+        record.i64 = fieldSetFlags()[2] ? this.i64 : (java.lang.Long) defaultValue(fields()[2]);
+        record.f32 = fieldSetFlags()[3] ? this.f32 : (java.lang.Float) defaultValue(fields()[3]);
+        record.f64 = fieldSetFlags()[4] ? this.f64 : (java.lang.Double) defaultValue(fields()[4]);
+        record.s = fieldSetFlags()[5] ? this.s : (java.lang.CharSequence) defaultValue(fields()[5]);
+        record.d = fieldSetFlags()[6] ? this.d : (java.time.LocalDate) defaultValue(fields()[6]);
+        record.t = fieldSetFlags()[7] ? this.t : (java.time.LocalTime) defaultValue(fields()[7]);
+        record.ts = fieldSetFlags()[8] ? this.ts : (java.time.Instant) defaultValue(fields()[8]);
+        record.dec = fieldSetFlags()[9] ? this.dec : (java.math.BigDecimal) defaultValue(fields()[9]);
         return record;
       } catch (java.lang.Exception e) {
         throw new org.apache.avro.AvroRuntimeException(e);
diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
index b92025f..1936bd6 100644
--- a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
+++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
@@ -293,6 +293,72 @@ public class SpecificCompiler {
     return dateTimeLogicalTypeImplementation;
   }
 
+  public void addCustomConversion(Class<?> conversionClass) {
+    try {
+      final Conversion<?> conversion = (Conversion<?>)conversionClass.newInstance();
+      specificData.addLogicalTypeConversion(conversion);
+    }  catch (IllegalAccessException | InstantiationException e) {
+      throw new RuntimeException("Failed to instantiate conversion class " + conversionClass, e);
+    }
+  }
+
+  public Collection<String> getUsedConversionClasses(Schema schema) {
+    LinkedHashMap<String, Conversion<?>> classnameToConversion = new LinkedHashMap<>();
+    for (Conversion<?> conversion : specificData.getConversions()) {
+      classnameToConversion.put(conversion.getConvertedType().getCanonicalName(), conversion);
+    }
+    Collection<String> result = new HashSet<>();
+    for (String className : getClassNamesOfPrimitiveFields(schema)) {
+      if (classnameToConversion.containsKey(className)) {
+        result.add(classnameToConversion.get(className).getClass().getCanonicalName());
+      }
+    }
+    return result;
+  }
+
+  private Set<String> getClassNamesOfPrimitiveFields(Schema schema) {
+    Set<String> result = new HashSet<>();
+    getClassNamesOfPrimitiveFields(schema, result, new HashSet<>());
+    return result;
+  }
+
+  private void getClassNamesOfPrimitiveFields(Schema schema, Set<String> result, Set<Schema> seenSchemas) {
+    if (seenSchemas.contains(schema)) {
+      return;
+    }
+    seenSchemas.add(schema);
+    switch (schema.getType()) {
+      case RECORD:
+        for (Schema.Field field : schema.getFields()) {
+          getClassNamesOfPrimitiveFields(field.schema(), result, seenSchemas);
+        }
+        break;
+      case MAP:
+        getClassNamesOfPrimitiveFields(schema.getValueType(), result, seenSchemas);
+        break;
+      case ARRAY:
+        getClassNamesOfPrimitiveFields(schema.getElementType(), result, seenSchemas);
+        break;
+      case UNION:
+        for (Schema s : schema.getTypes())
+          getClassNamesOfPrimitiveFields(s, result, seenSchemas);
+        break;
+      case ENUM:
+      case FIXED:
+      case NULL:
+        break;
+      case STRING: case BYTES:
+      case INT: case LONG:
+      case FLOAT: case DOUBLE:
+      case BOOLEAN:
+        result.add(javaType(schema));
+        break;
+      default: throw new RuntimeException("Unknown type: "+schema);
+    }
+  }
+
+  private static String logChuteName = null;
+
   private void initializeVelocity() {
     this.velocityEngine = new VelocityEngine();
 
@@ -810,14 +876,13 @@ public class SpecificCompiler {
       return "null";
     }
 
-    if (LogicalTypes.date().equals(schema.getLogicalType())) {
-      return "DATE_CONVERSION";
-    } else if (LogicalTypes.timeMillis().equals(schema.getLogicalType())) {
-      return "TIME_CONVERSION";
-    } else if (LogicalTypes.timestampMillis().equals(schema.getLogicalType())) {
-      return "TIMESTAMP_CONVERSION";
-    } else if (LogicalTypes.Decimal.class.equals(schema.getLogicalType().getClass())) {
-      return enableDecimalLogicalType ? "DECIMAL_CONVERSION" : "null";
+    if (LogicalTypes.Decimal.class.equals(schema.getLogicalType().getClass()) && !enableDecimalLogicalType) {
+      return "null";
+    }
+
+    final Conversion<Object> conversion = specificData.getConversionFor(schema.getLogicalType());
+    if (conversion != null) {
+      return "new " + conversion.getClass().getCanonicalName() + "()";
     }
 
     return "null";
diff --git a/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm b/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
index f2e3bcf..23b5848 100644
--- a/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
+++ b/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
@@ -42,6 +42,14 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
   public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
 
   private static SpecificData MODEL$ = new SpecificData();
+#set ($usedConversions = $this.getUsedConversionClasses($schema))
+#if (!$usedConversions.isEmpty())
+static {
+#foreach ($conversion in $usedConversions)
+    MODEL$.addLogicalTypeConversion(new ${conversion}());
+#end
+  }
+#end
 
 #if (!$schema.isError())
   private static final BinaryMessageEncoder<${this.mangle($schema.getName())}> ENCODER =
@@ -158,6 +166,7 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
 #end
 
 #end
+  public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -172,17 +181,6 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
   }
 
 #if ($this.hasLogicalTypeField($schema))
-  protected static final org.apache.avro.Conversions.DecimalConversion DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion();
-#if ($this.getDateTimeLogicalTypeImplementation().name() == "JODA")
-  protected static final org.apache.avro.data.TimeConversions.DateConversion DATE_CONVERSION = new org.apache.avro.data.TimeConversions.DateConversion();
-  protected static final org.apache.avro.data.TimeConversions.TimeConversion TIME_CONVERSION = new org.apache.avro.data.TimeConversions.TimeConversion();
-  protected static final org.apache.avro.data.TimeConversions.TimestampConversion TIMESTAMP_CONVERSION = new org.apache.avro.data.TimeConversions.TimestampConversion();
-#elseif ($this.getDateTimeLogicalTypeImplementation().name() == "JSR310")
-  protected static final org.apache.avro.data.Jsr310TimeConversions.DateConversion DATE_CONVERSION = new org.apache.avro.data.Jsr310TimeConversions.DateConversion();
-  protected static final org.apache.avro.data.Jsr310TimeConversions.TimeMillisConversion TIME_CONVERSION = new org.apache.avro.data.Jsr310TimeConversions.TimeMillisConversion();
-  protected static final org.apache.avro.data.Jsr310TimeConversions.TimestampMillisConversion TIMESTAMP_CONVERSION = new org.apache.avro.data.Jsr310TimeConversions.TimestampMillisConversion();
-#end
-
   private static final org.apache.avro.Conversion<?>[] conversions =
       new org.apache.avro.Conversion<?>[] {
 #foreach ($field in $schema.getFields())
@@ -502,20 +500,12 @@ public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends or
             throw e;
           }
         } else {
-#if ($this.hasLogicalTypeField($schema))
-          record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()], record.getConversion($field.pos()));
-#else
           record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()]);
-#end
         }
 #else
-#if ($this.hasLogicalTypeField($schema))
-        record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()], record.getConversion($field.pos()));
-#else
         record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()]);
 #end
 #end
-#end
         return record;
       } catch (org.apache.avro.AvroMissingFieldException e) {
         throw e;
diff --git a/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java b/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
index e1210ac..f0cb87a 100644
--- a/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
+++ b/lang/java/compiler/src/test/java/org/apache/avro/compiler/specific/TestSpecificCompiler.java
@@ -475,6 +475,42 @@ public class TestSpecificCompiler {
   }
 
   @Test
+  public void testNullableLogicalTypesJavaUnboxDecimalTypesEnabled() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+    compiler.setEnableDecimalLogicalType(true);
+
+    // Nullable types should return boxed types instead of primitive types
+    Schema nullableDecimalSchema1 = Schema.createUnion(
+      Schema.create(Schema.Type.NULL), LogicalTypes.decimal(9,2)
+        .addToSchema(Schema.create(Schema.Type.BYTES)));
+    Schema nullableDecimalSchema2 = Schema.createUnion(
+      LogicalTypes.decimal(9,2)
+        .addToSchema(Schema.create(Schema.Type.BYTES)), Schema.create(Schema.Type.NULL));
+    Assert.assertEquals("Should return boxed type",
+      compiler.javaUnbox(nullableDecimalSchema1), "java.math.BigDecimal");
+    Assert.assertEquals("Should return boxed type",
+      compiler.javaUnbox(nullableDecimalSchema2), "java.math.BigDecimal");
+  }
+
+  @Test
+  public void testNullableLogicalTypesJavaUnboxDecimalTypesDisabled() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+    compiler.setEnableDecimalLogicalType(false);
+
+    // Since logical decimal types are disabled, a ByteBuffer is expected.
+    Schema nullableDecimalSchema1 = Schema.createUnion(
+      Schema.create(Schema.Type.NULL), LogicalTypes.decimal(9,2)
+        .addToSchema(Schema.create(Schema.Type.BYTES)));
+    Schema nullableDecimalSchema2 = Schema.createUnion(
+      LogicalTypes.decimal(9,2)
+        .addToSchema(Schema.create(Schema.Type.BYTES)), Schema.create(Schema.Type.NULL));
+    Assert.assertEquals("Should return boxed type",
+      compiler.javaUnbox(nullableDecimalSchema1), "java.nio.ByteBuffer");
+    Assert.assertEquals("Should return boxed type",
+      compiler.javaUnbox(nullableDecimalSchema2), "java.nio.ByteBuffer");
+  }
+
+  @Test
   public void testNullableTypesJavaUnbox() throws Exception {
     SpecificCompiler compiler = createCompiler();
     compiler.setEnableDecimalLogicalType(false);
@@ -527,6 +563,76 @@ public class TestSpecificCompiler {
   }
 
   @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypes() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+    compiler.setEnableDecimalLogicalType(true);
+
+    Schema nullableDecimal1 = Schema.createUnion(
+      Schema.create(Schema.Type.NULL), LogicalTypes.decimal(9,2)
+        .addToSchema(Schema.create(Schema.Type.BYTES)));
+    Schema schemaWithNullableDecimal1 = Schema.createRecord("WithNullableDecimal", "", "", false, Collections.singletonList(new Schema.Field("decimal", nullableDecimal1, "", null)));
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schemaWithNullableDecimal1);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.Conversions.DecimalConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypesInNestedRecord() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+
+    final Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"NestedLogicalTypesRecord\",\"namespace\":\"org.apache.avro.codegentest.testdata\",\"doc\":\"Test nested types with logical types in generated Java classes\",\"fields\":[{\"name\":\"nestedRecord\",\"type\":{\"type\":\"record\",\"name\":\"NestedRecord\",\"fields\":[{\"name\":\"nullableDateField\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]}]}}]}");
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schema);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.data.TimeConversions.DateConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypesInArray() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+
+    final Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"NullableLogicalTypesArray\",\"namespace\":\"org.apache.avro.codegentest.testdata\",\"doc\":\"Test nested types with logical types in generated Java classes\",\"fields\":[{\"name\":\"arrayOfLogicalType\",\"type\":{\"type\":\"array\",\"items\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]}}]}");
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schema);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.data.TimeConversions.DateConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypesInArrayOfRecords() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+
+    final Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"NestedLogicalTypesArray\",\"namespace\":\"org.apache.avro.codegentest.testdata\",\"doc\":\"Test nested types with logical types in generated Java classes\",\"fields\":[{\"name\":\"arrayOfRecords\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"RecordInArray\",\"fields\":[{\"name\":\"nullableDateField\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]}]}}}]}");
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schema);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.data.TimeConversions.DateConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypesInUnionOfRecords() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+
+    final Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"NestedLogicalTypesUnion\",\"namespace\":\"org.apache.avro.codegentest.testdata\",\"doc\":\"Test nested types with logical types in generated Java classes\",\"fields\":[{\"name\":\"unionOfRecords\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"RecordInUnion\",\"fields\":[{\"name\":\"nullableDateField\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]}]}]}]}");
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schema);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.data.TimeConversions.DateConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
+  public void testGetUsedConversionClassesForNullableLogicalTypesInMapOfRecords() throws Exception {
+    SpecificCompiler compiler = createCompiler();
+
+    final Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"NestedLogicalTypesMap\",\"namespace\":\"org.apache.avro.codegentest.testdata\",\"doc\":\"Test nested types with logical types in generated Java classes\",\"fields\":[{\"name\":\"mapOfRecords\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"RecordInMap\",\"fields\":[{\"name\":\"nullableDateField\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]}]},\"avro.java.string\ [...]
+
+    final Collection<String> usedConversionClasses = compiler.getUsedConversionClasses(schema);
+    Assert.assertEquals(1, usedConversionClasses.size());
+    Assert.assertEquals("org.apache.avro.data.TimeConversions.DateConversion", usedConversionClasses.iterator().next());
+  }
+
+  @Test
   public void testLogicalTypesWithMultipleFields() throws Exception {
     Schema logicalTypesWithMultipleFields = new Schema.Parser().parse(
         new File("src/test/resources/logical_types_with_multiple_fields.avsc"));
@@ -566,12 +672,12 @@ public class TestSpecificCompiler {
     Schema uuidSchema = LogicalTypes.uuid()
         .addToSchema(Schema.create(Schema.Type.STRING));
 
-    Assert.assertEquals("Should use DATE_CONVERSION for date type",
-        "DATE_CONVERSION", compiler.conversionInstance(dateSchema));
-    Assert.assertEquals("Should use TIME_CONVERSION for time type",
-        "TIME_CONVERSION", compiler.conversionInstance(timeSchema));
-    Assert.assertEquals("Should use TIMESTAMP_CONVERSION for date type",
-        "TIMESTAMP_CONVERSION", compiler.conversionInstance(timestampSchema));
+    Assert.assertEquals("Should use date conversion for date type",
+        "new org.apache.avro.data.TimeConversions.DateConversion()", compiler.conversionInstance(dateSchema));
+    Assert.assertEquals("Should use time conversion for time type",
+        "new org.apache.avro.data.TimeConversions.TimeConversion()", compiler.conversionInstance(timeSchema));
+    Assert.assertEquals("Should use timestamp conversion for date type",
+        "new org.apache.avro.data.TimeConversions.TimestampConversion()", compiler.conversionInstance(timestampSchema));
     Assert.assertEquals("Should use null for decimal if the flag is off",
         "null", compiler.conversionInstance(decimalSchema));
     Assert.assertEquals("Should use null for decimal if the flag is off",
@@ -595,14 +701,14 @@ public class TestSpecificCompiler {
     Schema uuidSchema = LogicalTypes.uuid()
         .addToSchema(Schema.create(Schema.Type.STRING));
 
-    Assert.assertEquals("Should use DATE_CONVERSION for date type",
-        "DATE_CONVERSION", compiler.conversionInstance(dateSchema));
-    Assert.assertEquals("Should use TIME_CONVERSION for time type",
-        "TIME_CONVERSION", compiler.conversionInstance(timeSchema));
-    Assert.assertEquals("Should use TIMESTAMP_CONVERSION for date type",
-        "TIMESTAMP_CONVERSION", compiler.conversionInstance(timestampSchema));
+    Assert.assertEquals("Should use date conversion for date type",
+            "new org.apache.avro.data.TimeConversions.DateConversion()", compiler.conversionInstance(dateSchema));
+    Assert.assertEquals("Should use time conversion for time type",
+            "new org.apache.avro.data.TimeConversions.TimeConversion()", compiler.conversionInstance(timeSchema));
+    Assert.assertEquals("Should use timestamp conversion for date type",
+            "new org.apache.avro.data.TimeConversions.TimestampConversion()", compiler.conversionInstance(timestampSchema));
     Assert.assertEquals("Should use null for decimal if the flag is off",
-        "DECIMAL_CONVERSION", compiler.conversionInstance(decimalSchema));
+        "new org.apache.avro.Conversions.DecimalConversion()", compiler.conversionInstance(decimalSchema));
     Assert.assertEquals("Should use null for decimal if the flag is off",
         "null", compiler.conversionInstance(uuidSchema));
   }
diff --git a/lang/java/integration-test/codegen-test/pom.xml b/lang/java/integration-test/codegen-test/pom.xml
new file mode 100644
index 0000000..2be8ad7
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>avro-integration-test</artifactId>
+    <groupId>org.apache.avro</groupId>
+    <version>1.9.0-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+
+  <artifactId>avro-codegen-test</artifactId>
+
+  <name>Apache Avro Codegen Test</name>
+  <packaging>jar</packaging>
+  <url>http://avro.apache.org</url>
+  <description>Tests generated Avro Specific Java API</description>
+  <build>
+     <plugins>
+       <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+         <version>${project.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>schema</goal>
+              <goal>protocol</goal>
+              <goal>idl-protocol</goal>
+            </goals>
+            <configuration>
+              <stringType>String</stringType>
+              <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+              <testOutputDirectory>${project.build.directory}/generated-test-sources/java</testOutputDirectory>
+              <enableDecimalLogicalType>true</enableDecimalLogicalType>
+              <customConversions>
+                <conversion>org.apache.avro.codegentest.CustomDecimalConversion</conversion>
+              </customConversions>
+            </configuration>
+          </execution>
+        </executions>
+         <dependencies>
+           <dependency>
+             <groupId>org.apache.avro</groupId>
+             <artifactId>avro-test-custom-conversions</artifactId>
+             <version>${project.version}</version>
+           </dependency>
+         </dependencies>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro-compiler</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro-test-custom-conversions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/AbstractSpecificRecordTest.java b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/AbstractSpecificRecordTest.java
new file mode 100644
index 0000000..9d8a273
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/AbstractSpecificRecordTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.junit.Assert;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+abstract class AbstractSpecificRecordTest {
+
+  @SuppressWarnings("unchecked")
+  <T extends SpecificRecordBase> void verifySerDeAndStandardMethods(T original) {
+    final SpecificDatumWriter<T> datumWriterFromSchema = new SpecificDatumWriter<>(original.getSchema());
+    final SpecificDatumReader<T> datumReaderFromSchema = new SpecificDatumReader<>(original.getSchema(), original.getSchema());
+    verifySerDeAndStandardMethods(original, datumWriterFromSchema, datumReaderFromSchema);
+    final SpecificDatumWriter<T> datumWriterFromClass = new SpecificDatumWriter(original.getClass());
+    final SpecificDatumReader<T> datumReaderFromClass = new SpecificDatumReader(original.getClass());
+    verifySerDeAndStandardMethods(original, datumWriterFromClass, datumReaderFromClass);
+  }
+
+  private <T extends SpecificRecordBase> void verifySerDeAndStandardMethods(T original, SpecificDatumWriter<T> datumWriter, SpecificDatumReader<T> datumReader) {
+    final byte[] serialized = serialize(original, datumWriter);
+    final T copy = deserialize(serialized, datumReader);
+    Assert.assertEquals(original, copy);
+    // In addition to equals() tested above, make sure the other methods that use SpecificData work as intended
+    // compareTo() throws an exception for maps, otherwise we would have tested it here
+    // Assert.assertEquals(0, original.compareTo(copy));
+    Assert.assertEquals(original.hashCode(), copy.hashCode());
+    Assert.assertEquals(original.toString(), copy.toString());
+  }
+
+  private <T extends SpecificRecordBase> byte[] serialize(T object, SpecificDatumWriter<T> datumWriter) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try {
+      datumWriter.write(object, EncoderFactory.get().directBinaryEncoder(outputStream, null));
+      return outputStream.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private <T extends SpecificRecordBase> T deserialize(byte[] bytes, SpecificDatumReader<T> datumReader) {
+    try {
+      final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+      return datumReader.read(null, DecoderFactory.get().directBinaryDecoder(byteArrayInputStream, null));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestCustomConversion.java b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestCustomConversion.java
new file mode 100644
index 0000000..55e60a2
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestCustomConversion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import org.apache.avro.codegentest.testdata.LogicalTypesWithCustomConversion;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+public class TestCustomConversion extends AbstractSpecificRecordTest {
+
+    @Test
+    public void testNullValues() throws IOException {
+        LogicalTypesWithCustomConversion instanceOfGeneratedClass = LogicalTypesWithCustomConversion.newBuilder()
+                .setNonNullCustomField(new CustomDecimal(BigInteger.valueOf(100), 2))
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+    @Test
+    public void testNonNullValues() throws IOException {
+        LogicalTypesWithCustomConversion instanceOfGeneratedClass = LogicalTypesWithCustomConversion.newBuilder()
+                .setNonNullCustomField(new CustomDecimal(BigInteger.valueOf(100), 2))
+                .setNullableCustomField(new CustomDecimal(BigInteger.valueOf(3000), 2))
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+}
diff --git a/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestLogicalTypesWithDefaults.java b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestLogicalTypesWithDefaults.java
new file mode 100644
index 0000000..c2d2d6d
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestLogicalTypesWithDefaults.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import org.apache.avro.codegentest.testdata.LogicalTypesWithDefaults;
+import org.joda.time.LocalDate;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestLogicalTypesWithDefaults extends AbstractSpecificRecordTest {
+
+    private static final LocalDate DEFAULT_VALUE = LocalDate.parse("1973-05-19");
+
+    @Test
+    public void testDefaultValueOfNullableField() throws IOException {
+        LogicalTypesWithDefaults instanceOfGeneratedClass = LogicalTypesWithDefaults.newBuilder()
+                .setNonNullDate(LocalDate.now())
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+    @Test
+    public void testDefaultValueOfNonNullField() throws IOException {
+        LogicalTypesWithDefaults instanceOfGeneratedClass = LogicalTypesWithDefaults.newBuilder()
+                .setNullableDate(LocalDate.now())
+                .build();
+        Assert.assertEquals(DEFAULT_VALUE, instanceOfGeneratedClass.getNonNullDate());
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+    @Test
+    public void testWithValues() throws IOException {
+        LogicalTypesWithDefaults instanceOfGeneratedClass = LogicalTypesWithDefaults.newBuilder()
+                .setNullableDate(LocalDate.now())
+                .setNonNullDate(LocalDate.now())
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+}
diff --git a/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNestedLogicalTypes.java b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNestedLogicalTypes.java
new file mode 100644
index 0000000..a33d038
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNestedLogicalTypes.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.codegentest;
+
+import org.apache.avro.codegentest.testdata.*;
+import org.joda.time.LocalDate;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class TestNestedLogicalTypes extends AbstractSpecificRecordTest {
+
+  @Test
+  public void testNullableLogicalTypeInNestedRecord() {
+    final NestedLogicalTypesRecord nestedLogicalTypesRecord =
+      NestedLogicalTypesRecord.newBuilder()
+        .setNestedRecord(NestedRecord.newBuilder()
+          .setNullableDateField(LocalDate.now()).build()).build();
+    verifySerDeAndStandardMethods(nestedLogicalTypesRecord);
+  }
+
+  @Test
+  public void testNullableLogicalTypeInArray() {
+    final NullableLogicalTypesArray logicalTypesArray =
+      NullableLogicalTypesArray.newBuilder().setArrayOfLogicalType(Collections.singletonList(LocalDate.now())).build();
+    verifySerDeAndStandardMethods(logicalTypesArray);
+  }
+
+  @Test
+  public void testNullableLogicalTypeInRecordInArray() {
+    final NestedLogicalTypesArray nestedLogicalTypesArray =
+      NestedLogicalTypesArray.newBuilder().setArrayOfRecords(Collections.singletonList(
+        RecordInArray.newBuilder().setNullableDateField(LocalDate.now()).build())).build();
+    verifySerDeAndStandardMethods(nestedLogicalTypesArray);
+  }
+
+  @Test
+  public void testNullableLogicalTypeInRecordInUnion() {
+    final NestedLogicalTypesUnion nestedLogicalTypesUnion =
+      NestedLogicalTypesUnion.newBuilder().setUnionOfRecords(
+        RecordInUnion.newBuilder().setNullableDateField(LocalDate.now()).build()).build();
+    verifySerDeAndStandardMethods(nestedLogicalTypesUnion);
+  }
+
+  @Test
+  public void testNullableLogicalTypeInRecordInMap() {
+    final NestedLogicalTypesMap nestedLogicalTypesMap =
+      NestedLogicalTypesMap.newBuilder().setMapOfRecords(Collections.singletonMap("key",
+        RecordInMap.newBuilder().setNullableDateField(LocalDate.now()).build())).build();
+    verifySerDeAndStandardMethods(nestedLogicalTypesMap);
+  }
+}
diff --git a/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNullableLogicalTypes.java b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNullableLogicalTypes.java
new file mode 100644
index 0000000..3a44174
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/java/org/apache/avro/codegentest/TestNullableLogicalTypes.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import org.apache.avro.codegentest.testdata.NullableLogicalTypes;
+import org.joda.time.LocalDate;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestNullableLogicalTypes extends AbstractSpecificRecordTest {
+
+    @Test
+    public void testWithNullValues() throws IOException {
+        NullableLogicalTypes instanceOfGeneratedClass = NullableLogicalTypes.newBuilder()
+                .setNullableDate(null)
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+    @Test
+    public void testDate() throws IOException {
+        NullableLogicalTypes instanceOfGeneratedClass = NullableLogicalTypes.newBuilder()
+                .setNullableDate(LocalDate.now())
+                .build();
+        verifySerDeAndStandardMethods(instanceOfGeneratedClass);
+    }
+
+}
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/custom_conversion.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/custom_conversion.avsc
new file mode 100644
index 0000000..ff33c39
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/custom_conversion.avsc
@@ -0,0 +1,12 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "LogicalTypesWithCustomConversion",
+  "doc" : "Test unions with logical types in generated Java classes",
+  "fields": [
+    {"name": "nullableCustomField",  "type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 2}], "default": null},
+    {"name": "nonNullCustomField",  "type": {"type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 2}}
+  ]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/logical_types_with_default_values.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/logical_types_with_default_values.avsc
new file mode 100644
index 0000000..d164b0a
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/logical_types_with_default_values.avsc
@@ -0,0 +1,12 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "LogicalTypesWithDefaults",
+  "doc" : "Test logical types and default values in generated Java classes",
+  "fields": [
+    {"name": "nullableDate",  "type": [{"type": "int", "logicalType": "date"}, "null"], "default": 1234},
+    {"name": "nonNullDate",  "type": {"type": "int", "logicalType": "date"}, "default": 1234}
+  ]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_array.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_array.avsc
new file mode 100644
index 0000000..c5eba14
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_array.avsc
@@ -0,0 +1,26 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NestedLogicalTypesArray",
+  "doc" : "Test nested types with logical types in generated Java classes",
+  "fields": [
+    {
+      "name": "arrayOfRecords",
+      "type": {
+        "type": "array",
+        "items": {
+          "namespace": "org.apache.avro.codegentest.testdata",
+          "name": "RecordInArray",
+          "type": "record",
+          "fields": [
+            {
+              "name": "nullableDateField",
+              "type": ["null", {"type": "int", "logicalType": "date"}]
+            }
+          ]
+        }
+      }
+    }]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_map.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_map.avsc
new file mode 100644
index 0000000..f99e457
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_map.avsc
@@ -0,0 +1,26 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NestedLogicalTypesMap",
+  "doc" : "Test nested types with logical types in generated Java classes",
+  "fields": [
+    {
+      "name": "mapOfRecords",
+      "type": {
+        "type": "map",
+        "values": {
+          "namespace": "org.apache.avro.codegentest.testdata",
+          "name": "RecordInMap",
+          "type": "record",
+          "fields": [
+            {
+              "name": "nullableDateField",
+              "type": ["null", {"type": "int", "logicalType": "date"}]
+            }
+          ]
+        }
+      }
+    }]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_record.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_record.avsc
new file mode 100644
index 0000000..d51ac86
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_record.avsc
@@ -0,0 +1,23 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NestedLogicalTypesRecord",
+  "doc" : "Test nested types with logical types in generated Java classes",
+  "fields": [
+    {
+      "name": "nestedRecord",
+      "type": {
+        "namespace": "org.apache.avro.codegentest.testdata",
+        "type": "record",
+        "name": "NestedRecord",
+        "fields": [
+          {
+            "name": "nullableDateField",
+            "type": ["null", {"type": "int", "logicalType": "date"}]
+          }
+        ]
+      }
+    }]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_union.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_union.avsc
new file mode 100644
index 0000000..44a495c
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nested_logical_types_union.avsc
@@ -0,0 +1,23 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NestedLogicalTypesUnion",
+  "doc" : "Test nested types with logical types in generated Java classes",
+  "fields": [
+    {
+      "name": "unionOfRecords",
+      "type": ["null", {
+        "namespace": "org.apache.avro.codegentest.testdata",
+        "name": "RecordInUnion",
+        "type": "record",
+        "fields": [
+          {
+            "name": "nullableDateField",
+            "type": ["null", {"type": "int", "logicalType": "date"}]
+          }
+        ]
+      }]
+    }]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types.avsc
new file mode 100644
index 0000000..0133133
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NullableLogicalTypes",
+  "doc" : "Test unions with logical types in generated Java classes",
+  "fields": [
+    {"name": "nullableDate",  "type": ["null", {"type": "int", "logicalType": "date"}], "default": null}
+  ]
+}
+
+
+
diff --git a/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types_array.avsc b/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types_array.avsc
new file mode 100644
index 0000000..8e5cade
--- /dev/null
+++ b/lang/java/integration-test/codegen-test/src/test/resources/avro/nullable_logical_types_array.avsc
@@ -0,0 +1,16 @@
+{"namespace": "org.apache.avro.codegentest.testdata",
+  "type": "record",
+  "name": "NullableLogicalTypesArray",
+  "doc" : "Test nested types with logical types in generated Java classes",
+  "fields": [
+    {
+      "name": "arrayOfLogicalType",
+      "type": {
+        "type": "array",
+        "items": ["null", {"type": "int", "logicalType": "date"}]
+      }
+    }]
+}
+
+
+
diff --git a/lang/java/integration-test/pom.xml b/lang/java/integration-test/pom.xml
new file mode 100644
index 0000000..226a0dc
--- /dev/null
+++ b/lang/java/integration-test/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>avro-parent</artifactId>
+        <groupId>org.apache.avro</groupId>
+        <version>1.9.0-SNAPSHOT</version>
+        <relativePath>../</relativePath>
+    </parent>
+
+    <artifactId>avro-integration-test</artifactId>
+    <name>Avro Integration Tests</name>
+    <description>Integration tests for code generation or other things that are hard to test within the modules without creating circular Maven dependencies.</description>
+    <url>http://avro.apache.org/</url>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>codegen-test</module>
+        <module>test-custom-conversions</module>
+    </modules>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>${surefire-plugin.version}</version>
+                    <configuration>
+                        <failIfNoTests>false</failIfNoTests>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>${compiler-plugin.version}</version>
+                    <configuration>
+                        <source>1.8</source>
+                        <target>1.8</target>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-checkstyle-plugin</artifactId>
+                    <version>${checkstyle-plugin.version}</version>
+                    <configuration>
+                        <consoleOutput>true</consoleOutput>
+                        <configLocation>checkstyle.xml</configLocation>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>checkstyle-check</id>
+                            <phase>test</phase>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>${jar-plugin.version}</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>test-jar</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+    <profiles>
+    </profiles>
+
+</project>
+
diff --git a/lang/java/integration-test/test-custom-conversions/pom.xml b/lang/java/integration-test/test-custom-conversions/pom.xml
new file mode 100644
index 0000000..7bac7ae
--- /dev/null
+++ b/lang/java/integration-test/test-custom-conversions/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>avro-integration-test</artifactId>
+    <groupId>org.apache.avro</groupId>
+    <version>1.9.0-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+
+  <artifactId>avro-test-custom-conversions</artifactId>
+
+  <name>Apache Avro Codegen Test dependencies</name>
+  <packaging>jar</packaging>
+  <url>http://avro.apache.org</url>
+  <description>Contains dependencies for the maven plugin used in avro-codegen-test</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimal.java b/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimal.java
new file mode 100644
index 0000000..1d4f40c
--- /dev/null
+++ b/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimal.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * Wraps a BigDecimal just to demonstrate that it is possible to use custom implementation classes with custom conversions.
+ */
+public class CustomDecimal implements Comparable<CustomDecimal> {
+
+    private final BigDecimal internalValue;
+
+    public CustomDecimal(BigInteger value, int scale) {
+        internalValue = new BigDecimal(value, scale);
+    }
+
+    public byte[] toByteArray(int scale) {
+        final BigDecimal correctlyScaledValue;
+        if (scale != internalValue.scale()) {
+            correctlyScaledValue = internalValue.setScale(scale, BigDecimal.ROUND_HALF_UP);
+        } else {
+            correctlyScaledValue = internalValue;
+        }
+        return correctlyScaledValue.unscaledValue().toByteArray();
+
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CustomDecimal that = (CustomDecimal) o;
+
+        return internalValue.equals(that.internalValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return internalValue.hashCode();
+    }
+
+    @Override
+    public int compareTo(CustomDecimal o) {
+        return this.internalValue.compareTo(o.internalValue);
+    }
+}
diff --git a/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimalConversion.java b/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimalConversion.java
new file mode 100644
index 0000000..7c200ad
--- /dev/null
+++ b/lang/java/integration-test/test-custom-conversions/src/main/java/org.apache.avro.codegentest/CustomDecimalConversion.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.codegentest;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+public class CustomDecimalConversion extends Conversion<CustomDecimal> {
+
+    @Override
+    public Class<CustomDecimal> getConvertedType() {
+        return CustomDecimal.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+        return "decimal";
+    }
+
+    public CustomDecimal fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
+        int scale = ((LogicalTypes.Decimal)type).getScale();
+        byte[] bytes = value.get(new byte[value.remaining()]).array();
+        return new CustomDecimal(new BigInteger(bytes), scale);
+    }
+
+    public ByteBuffer toBytes(CustomDecimal value, Schema schema, LogicalType type) {
+        int scale = ((LogicalTypes.Decimal)type).getScale();
+        return ByteBuffer.wrap(value.toByteArray(scale));
+    }
+
+}
diff --git a/lang/java/mapred/pom.xml b/lang/java/mapred/pom.xml
index 4309b67..5bffd26 100644
--- a/lang/java/mapred/pom.xml
+++ b/lang/java/mapred/pom.xml
@@ -49,6 +49,18 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${jar-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>${project.groupId}</groupId>
         <artifactId>avro-maven-plugin</artifactId>
         <version>${project.version}</version>
diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java
index 23b77d5..0d7fbee 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java
@@ -20,10 +20,16 @@ package org.apache.avro.mojo;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.avro.compiler.specific.SpecificCompiler;
 import org.apache.avro.compiler.specific.SpecificCompiler.DateTimeLogicalTypeImplementation;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
 import org.apache.maven.plugin.AbstractMojo;
 import org.apache.maven.plugin.MojoExecutionException;
 import org.apache.maven.project.MavenProject;
@@ -142,6 +148,14 @@ public abstract class AbstractAvroMojo extends AbstractMojo {
   protected boolean createSetters;
 
   /**
+   * A set of fully qualified class names of custom {@link org.apache.avro.Conversion} implementations to add to the compiler.
+   * The classes must be on the classpath at compile time and whenever the Java objects are serialized.
+   *
+   * @parameter property="customConversions"
+   */
+  protected String[] customConversions = new String[0];
+
+  /**
    * Determines whether or not to use Java classes for decimal types
    *
    * @parameter default-value="false"
@@ -282,6 +296,24 @@ public abstract class AbstractAvroMojo extends AbstractMojo {
 
   protected abstract void doCompile(String filename, File sourceDirectory, File outputDirectory) throws IOException;
 
+  protected URLClassLoader createClassLoader() throws DependencyResolutionRequiredException, MalformedURLException {
+    List<URL> urls = appendElements(project.getRuntimeClasspathElements());
+    urls.addAll(appendElements(project.getTestClasspathElements()));
+    return new URLClassLoader(urls.toArray(new URL[urls.size()]),
+            Thread.currentThread().getContextClassLoader());
+  }
+
+  private List<URL> appendElements(List runtimeClasspathElements) throws MalformedURLException {
+    List<URL> runtimeUrls = new ArrayList<>();
+    if (runtimeClasspathElements != null) {
+      for (Object runtimeClasspathElement : runtimeClasspathElements) {
+        String element = (String) runtimeClasspathElement;
+        runtimeUrls.add(new File(element).toURI().toURL());
+      }
+    }
+    return runtimeUrls;
+  }
+
   protected abstract String[] getIncludes();
 
   protected abstract String[] getTestIncludes();
diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLProtocolMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLProtocolMojo.java
index da1ae33..9730901 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLProtocolMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLProtocolMojo.java
@@ -82,7 +82,6 @@ public class IDLProtocolMojo extends AbstractAvroMojo {
 
       URLClassLoader projPathLoader = new URLClassLoader
           (runtimeUrls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
-
       try (Idl parser = new Idl(new File(sourceDirectory, filename), projPathLoader)) {
 
         Protocol p = parser.CompilationUnit();
@@ -96,6 +95,9 @@ public class IDLProtocolMojo extends AbstractAvroMojo {
         compiler.setGettersReturnOptional(gettersReturnOptional);
         compiler.setCreateSetters(createSetters);
         compiler.setEnableDecimalLogicalType(enableDecimalLogicalType);
+        for (String customConversion : customConversions) {
+          compiler.addCustomConversion(projPathLoader.loadClass(customConversion));
+        }
         compiler.setOutputCharacterEncoding(project.getProperties().getProperty("project.build.sourceEncoding"));
         compiler.compileToDestination(null, outputDirectory);
       }
@@ -103,6 +105,8 @@ public class IDLProtocolMojo extends AbstractAvroMojo {
       throw new IOException(e);
     } catch (DependencyResolutionRequiredException drre) {
       throw new IOException(drre);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
     }
   }
 
diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/ProtocolMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/ProtocolMojo.java
index d4b3bf5..ab78903 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/ProtocolMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/ProtocolMojo.java
@@ -22,15 +22,18 @@ import org.apache.avro.generic.GenericData.StringType;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URLClassLoader;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.compiler.specific.SpecificCompiler;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
 
 /**
  * Generate Java classes and interfaces from Avro protocol files (.avpr)
  *
  * @goal protocol
  * @phase generate-sources
+ * @requiresDependencyResolution runtime
  * @threadSafe
  */
 public class ProtocolMojo extends AbstractAvroMojo {
@@ -64,6 +67,17 @@ public class ProtocolMojo extends AbstractAvroMojo {
     compiler.setGettersReturnOptional(gettersReturnOptional);
     compiler.setCreateSetters(createSetters);
     compiler.setEnableDecimalLogicalType(enableDecimalLogicalType);
+    final URLClassLoader classLoader;
+    try {
+      classLoader = createClassLoader();
+      for (String customConversion : customConversions) {
+        compiler.addCustomConversion(classLoader.loadClass(customConversion));
+      }
+    } catch (DependencyResolutionRequiredException e) {
+      throw new IOException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
     compiler.setOutputCharacterEncoding(project.getProperties().getProperty("project.build.sourceEncoding"));
     compiler.compileToDestination(src, outputDirectory);
   }
diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
index 9b4840c..55eb96a 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
@@ -22,15 +22,18 @@ import org.apache.avro.generic.GenericData.StringType;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URLClassLoader;
 
 import org.apache.avro.Schema;
 import org.apache.avro.compiler.specific.SpecificCompiler;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
 
 /**
  * Generate Java classes from Avro schema files (.avsc)
  *
  * @goal schema
  * @phase generate-sources
+ * @requiresDependencyResolution runtime+test
  * @threadSafe
  */
 public class SchemaMojo extends AbstractAvroMojo {
@@ -81,6 +84,16 @@ public class SchemaMojo extends AbstractAvroMojo {
     compiler.setGettersReturnOptional(gettersReturnOptional);
     compiler.setCreateSetters(createSetters);
     compiler.setEnableDecimalLogicalType(enableDecimalLogicalType);
+    try {
+      final URLClassLoader classLoader = createClassLoader();
+      for (String customConversion : customConversions) {
+        compiler.addCustomConversion(classLoader.loadClass(customConversion));
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } catch (DependencyResolutionRequiredException e) {
+      throw new IOException(e);
+    }
     compiler.setOutputCharacterEncoding(project.getProperties().getProperty("project.build.sourceEncoding"));
     compiler.compileToDestination(src, outputDirectory);
   }
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index 9d64b6e..c29f155 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -94,6 +94,7 @@
     <module>thrift</module>
     <module>archetypes</module>
     <module>grpc</module>
+    <module>integration-test</module>
   </modules>
 
   <build>
diff --git a/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java b/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
index 8f8a9a4..483479e 100644
--- a/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
+++ b/lang/java/tools/src/test/compiler/output-string/avro/examples/baseball/Player.java
@@ -99,6 +99,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
     this.position = position;
   }
 
+  public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
diff --git a/lang/java/tools/src/test/compiler/output/Player.java b/lang/java/tools/src/test/compiler/output/Player.java
index f20af1d..c8eb70e 100644
--- a/lang/java/tools/src/test/compiler/output/Player.java
+++ b/lang/java/tools/src/test/compiler/output/Player.java
@@ -99,6 +99,7 @@ public class Player extends org.apache.avro.specific.SpecificRecordBase implemen
     this.position = position;
   }
 
+  public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {