You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/02/03 10:01:56 UTC

[avro] branch branch-1.9 updated: AVRO-2247 - improved java reading performance with new reader (#391)

This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new cbc6e50  AVRO-2247 - improved java reading performance with new reader (#391)
cbc6e50 is described below

commit cbc6e500710864545a1b2f9ffa28edef532e26af
Author: Martin Jubelgas <un...@wolke7.net>
AuthorDate: Mon Feb 3 09:37:02 2020 +0100

    AVRO-2247 - improved java reading performance with new reader (#391)
    
    * AVRO-2247 - Add FastDatumReaderBuilder and dependencies (rebased)
    
    * Addressed comments to pull request
---
 lang/java/avro/pom.xml                             |  12 +
 .../src/main/java/org/apache/avro/Resolver.java    |  21 +-
 .../java/org/apache/avro/generic/GenericData.java  |  70 ++-
 .../apache/avro/generic/GenericDatumReader.java    |  24 +-
 .../java/org/apache/avro/io/FastReaderBuilder.java | 613 +++++++++++++++++++++
 .../java/org/apache/avro/io/ReflectionUtils.java   |  73 +++
 .../avro/io/parsing/ResolvingGrammarGenerator.java |   2 +-
 .../org/apache/avro/specific/SpecificData.java     |  30 +-
 .../src/main/java/org/apache/avro/util/Utf8.java   |  14 +-
 9 files changed, 833 insertions(+), 26 deletions(-)

diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml
index 4953fce..dddead4 100644
--- a/lang/java/avro/pom.xml
+++ b/lang/java/avro/pom.xml
@@ -86,6 +86,18 @@
               </systemPropertyVariables>
             </configuration>
           </execution>
+          <execution>
+            <id>test-with-fast-reader</id>
+            <phase>test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <org.apache.avro.fastread>true</org.apache.avro.fastread>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
     </plugins>
diff --git a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
index fead519..45f2a89 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
@@ -221,6 +221,7 @@ public class Resolver {
       this.error = e;
     }
 
+    @Override
     public String toString() {
       switch (this.error) {
       case INCOMPATIBLE_SCHEMA_TYPES:
@@ -365,9 +366,10 @@ public class Resolver {
    */
   public static class EnumAdjust extends Action {
     public final int[] adjustments;
+    public final Object[] values;
     public final boolean noAdjustmentsNeeded;
 
-    private EnumAdjust(Schema w, Schema r, GenericData d, int[] adj) {
+    private EnumAdjust(Schema w, Schema r, GenericData d, int[] adj, Object[] values) {
       super(w, r, d, Action.Type.ENUM);
       this.adjustments = adj;
       boolean noAdj;
@@ -378,6 +380,7 @@ public class Resolver {
         noAdj &= (i == adj[i]);
       }
       this.noAdjustmentsNeeded = noAdj;
+      this.values = values;
     }
 
     /**
@@ -393,11 +396,17 @@ public class Resolver {
       final List<String> rsymbols = r.getEnumSymbols();
       final int defaultIndex = (r.getEnumDefault() == null ? -1 : rsymbols.indexOf(r.getEnumDefault()));
       int[] adjustments = new int[wsymbols.size()];
+      Object[] values = new Object[wsymbols.size()];
+      Object defaultValue = (defaultIndex == -1) ? null : d.createEnum(r.getEnumDefault(), r);
       for (int i = 0; i < adjustments.length; i++) {
         int j = rsymbols.indexOf(wsymbols.get(i));
-        adjustments[i] = (0 <= j ? j : defaultIndex);
+        if (j < 0) {
+          j = defaultIndex;
+        }
+        adjustments[i] = j;
+        values[i] = (j == defaultIndex) ? defaultValue : d.createEnum(rsymbols.get(j), r);
       }
-      return new EnumAdjust(w, r, d, adjustments);
+      return new EnumAdjust(w, r, d, adjustments, values);
     }
   }
 
@@ -455,6 +464,11 @@ public class Resolver {
     public final Object[] defaults;
 
     /**
+     * Supplier that offers an optimized alternative to data.newRecord()
+     */
+    public final GenericData.InstanceSupplier instanceSupplier;
+
+    /**
      * Returns true iff <code>i&nbsp;==&nbsp;readerOrder[i].pos()</code> for all
      * indices <code>i</code>. Which is to say: the order of the reader's fields is
      * the same in both the reader's and writer's schema.
@@ -473,6 +487,7 @@ public class Resolver {
       this.readerOrder = ro;
       this.firstDefault = firstD;
       this.defaults = defaults;
+      this.instanceSupplier = d.getNewRecordSupplier(r);
     }
 
     /**
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
index 1aa50ac..8a3b437 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
@@ -17,9 +17,9 @@
  */
 package org.apache.avro.generic;
 
-import java.nio.ByteBuffer;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.AbstractList;
 import java.util.Arrays;
@@ -27,8 +27,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.WeakHashMap;
@@ -47,10 +47,11 @@ import org.apache.avro.UnresolvedUnionException;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.FastReaderBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.util.internal.Accessor;
 
@@ -178,6 +179,26 @@ public class GenericData {
     return (Conversion<Object>) conversions.get(logicalType.getName());
   }
 
+  public static final String FAST_READER_PROP = "org.apache.avro.fastread";
+  private boolean fastReaderEnabled = "true".equalsIgnoreCase(System.getProperty(FAST_READER_PROP));
+  private FastReaderBuilder fastReaderBuilder = null;
+
+  public GenericData setFastReaderEnabled(boolean flag) {
+    this.fastReaderEnabled = flag;
+    return this;
+  }
+
+  public boolean isFastReaderEnabled() {
+    return fastReaderEnabled && FastReaderBuilder.isSupportedData(this);
+  }
+
+  public FastReaderBuilder getFastReaderBuilder() {
+    if (fastReaderBuilder == null) {
+      fastReaderBuilder = new FastReaderBuilder(this);
+    }
+    return this.fastReaderBuilder;
+  }
+
   /**
    * Default implementation of {@link GenericRecord}. Note that this
    * implementation does not fill in default values for fields if they are not
@@ -520,7 +541,7 @@ public class GenericData {
 
   /** Returns a {@link DatumReader} for this kind of data. */
   public DatumReader createDatumReader(Schema schema) {
-    return new GenericDatumReader(schema, schema, this);
+    return createDatumReader(schema, schema);
   }
 
   /** Returns a {@link DatumReader} for this kind of data. */
@@ -1329,4 +1350,43 @@ public class GenericData {
     return new GenericData.Record(schema);
   }
 
+  /**
+   * Called to create new array instances. Subclasses may override to use a
+   * different array implementation. By default, this returns a
+   * {@link GenericData.Array}.
+   */
+  public Object newArray(Object old, int size, Schema schema) {
+    if (old instanceof GenericArray) {
+      ((GenericArray<?>) old).reset();
+      return old;
+    } else if (old instanceof Collection) {
+      ((Collection<?>) old).clear();
+      return old;
+    } else
+      return new GenericData.Array<Object>(size, schema);
+  }
+
+  /**
+   * Called to create new array instances. Subclasses may override to use a
+   * different map implementation. By default, this returns a {@link HashMap}.
+   */
+  public Object newMap(Object old, int size) {
+    if (old instanceof Map) {
+      ((Map<?, ?>) old).clear();
+      return old;
+    } else
+      return new HashMap<>(size);
+  }
+
+  /**
+   * create a supplier that allows to get new record instances for a given schema
+   * in an optimized way
+   */
+  public InstanceSupplier getNewRecordSupplier(Schema schema) {
+    return this::newRecord;
+  }
+
+  public interface InstanceSupplier {
+    public Object newInstance(Object oldInstance, Schema schema);
+  }
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
index 89df601..acf1ae9 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
@@ -45,6 +45,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
   private final GenericData data;
   private Schema actual;
   private Schema expected;
+  private DatumReader<D> fastDatumReader = null;
 
   private ResolvingDecoder creatorResolver = null;
   private final Thread creator;
@@ -91,6 +92,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
       expected = actual;
     }
     creatorResolver = null;
+    fastDatumReader = null;
   }
 
   /** Get the reader's schema. */
@@ -140,6 +142,13 @@ public class GenericDatumReader<D> implements DatumReader<D> {
   @Override
   @SuppressWarnings("unchecked")
   public D read(D reuse, Decoder in) throws IOException {
+    if (data.isFastReaderEnabled()) {
+      if (this.fastDatumReader == null) {
+        this.fastDatumReader = data.getFastReaderBuilder().createDatumReader(actual, expected);
+      }
+      return fastDatumReader.read(reuse, in);
+    }
+
     ResolvingDecoder resolver = getResolver(actual, expected);
     resolver.configure(in);
     D result = (D) read(reuse, expected, resolver);
@@ -432,14 +441,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
    */
   @SuppressWarnings("unchecked")
   protected Object newArray(Object old, int size, Schema schema) {
-    if (old instanceof GenericArray) {
-      ((GenericArray) old).reset();
-      return old;
-    } else if (old instanceof Collection) {
-      ((Collection) old).clear();
-      return old;
-    } else
-      return new GenericData.Array(size, schema);
+    return data.newArray(old, size, schema);
   }
 
   /**
@@ -448,11 +450,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
    */
   @SuppressWarnings("unchecked")
   protected Object newMap(Object old, int size) {
-    if (old instanceof Map) {
-      ((Map) old).clear();
-      return old;
-    } else
-      return new HashMap<>(size);
+    return data.newMap(old, size);
   }
 
   /**
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java
new file mode 100644
index 0000000..40a612c
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Resolver;
+import org.apache.avro.Resolver.Action;
+import org.apache.avro.Resolver.Container;
+import org.apache.avro.Resolver.EnumAdjust;
+import org.apache.avro.Resolver.Promote;
+import org.apache.avro.Resolver.ReaderUnion;
+import org.apache.avro.Resolver.RecordAdjust;
+import org.apache.avro.Resolver.Skip;
+import org.apache.avro.Resolver.WriterUnion;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.InstanceSupplier;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.FastReaderBuilder.RecordReader.Stage;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.util.WeakIdentityHashMap;
+import org.apache.avro.util.internal.Accessor;
+
+public class FastReaderBuilder {
+
+  /**
+   * Generic/SpecificData instance that contains basic functionalities like
+   * instantiation of objects
+   */
+  private final GenericData data;
+
+  /** first schema is reader schema, second is writer schema */
+  private final Map<Schema, Map<Schema, RecordReader>> readerCache = Collections
+      .synchronizedMap(new WeakIdentityHashMap<>());
+
+  private boolean keyClassEnabled = true;
+
+  private boolean classPropEnabled = true;
+
+  public static FastReaderBuilder get() {
+    return new FastReaderBuilder(GenericData.get());
+  }
+
+  public static FastReaderBuilder getSpecific() {
+    return new FastReaderBuilder(SpecificData.get());
+  }
+
+  public static boolean isSupportedData(GenericData data) {
+    return data.getClass() == GenericData.class || data.getClass() == SpecificData.class;
+  }
+
+  public FastReaderBuilder(GenericData parentData) {
+    this.data = parentData;
+  }
+
+  public FastReaderBuilder withKeyClassEnabled(boolean enabled) {
+    this.keyClassEnabled = enabled;
+    return this;
+  }
+
+  public boolean isKeyClassEnabled() {
+    return this.keyClassEnabled;
+  }
+
+  public FastReaderBuilder withClassPropEnabled(boolean enabled) {
+    this.classPropEnabled = enabled;
+    return this;
+  }
+
+  public boolean isClassPropEnabled() {
+    return this.classPropEnabled;
+  }
+
+  public <D> DatumReader<D> createDatumReader(Schema schema) throws IOException {
+    return createDatumReader(schema, schema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <D> DatumReader<D> createDatumReader(Schema writerSchema, Schema readerSchema) throws IOException {
+    Schema resolvedWriterSchema = Schema.applyAliases(writerSchema, readerSchema);
+    return (DatumReader<D>) getReaderFor(readerSchema, resolvedWriterSchema);
+  }
+
+  private FieldReader getReaderFor(Schema readerSchema, Schema writerSchema) throws IOException {
+    Action resolvedAction = Resolver.resolve(writerSchema, readerSchema, data);
+    return getReaderFor(resolvedAction, null);
+  }
+
+  private FieldReader getReaderFor(Action action, Conversion<?> explicitConversion) throws IOException {
+    final FieldReader baseReader = getNonConvertedReader(action);
+    return applyConversions(action.reader, baseReader, explicitConversion);
+  }
+
+  private RecordReader createRecordReader(RecordAdjust action) throws IOException {
+    // record readers are created in a two-step process, first registering it, then
+    // initializing it,
+    // to prevent endless loops on recursive types
+    RecordReader recordReader = getRecordReaderFromCache(action.reader, action.writer);
+    synchronized (recordReader) {
+      // only need to initialize once
+      if (recordReader.getInitializationStage() == Stage.NEW) {
+        initializeRecordReader(recordReader, action);
+      }
+    }
+    return recordReader;
+  }
+
+  private RecordReader initializeRecordReader(RecordReader recordReader, RecordAdjust action) throws IOException {
+    recordReader.startInitialization();
+
+    // generate supplier for the new object instances
+    Object testInstance = action.instanceSupplier.newInstance(null, action.reader);
+    IntFunction<Conversion<?>> conversionSupplier = getConversionSupplier(testInstance);
+
+    ExecutionStep[] readSteps = new ExecutionStep[action.fieldActions.length + action.readerOrder.length
+        - action.firstDefault];
+
+    int i = 0;
+    int fieldCounter = 0;
+    // compute what to do with writer's fields
+    for (; i < action.fieldActions.length; i++) {
+      Action fieldAction = action.fieldActions[i];
+      if (fieldAction instanceof Skip) {
+        readSteps[i] = (r, decoder) -> GenericDatumReader.skip(fieldAction.writer, decoder);
+      } else {
+        Field readerField = action.readerOrder[fieldCounter++];
+        Conversion<?> conversion = conversionSupplier.apply(readerField.pos());
+        FieldReader reader = getReaderFor(fieldAction, conversion);
+        readSteps[i] = createFieldSetter(readerField, reader);
+      }
+    }
+
+    // add defaulting if required
+    for (; i < readSteps.length; i++) {
+      readSteps[i] = getDefaultingStep(action.readerOrder[fieldCounter++]);
+    }
+
+    recordReader.finishInitialization(readSteps, action.reader, action.instanceSupplier);
+    return recordReader;
+  }
+
+  private ExecutionStep createFieldSetter(Field field, FieldReader reader) {
+    int pos = field.pos();
+    if (reader.canReuse()) {
+      return (object, decoder) -> {
+        IndexedRecord record = (IndexedRecord) object;
+        record.put(pos, reader.read(record.get(pos), decoder));
+      };
+    } else {
+      return (object, decoder) -> {
+        IndexedRecord record = (IndexedRecord) object;
+        record.put(pos, reader.read(null, decoder));
+      };
+    }
+  }
+
+  private ExecutionStep getDefaultingStep(Schema.Field field) throws IOException {
+    Object defaultValue = data.getDefaultValue(field);
+
+    if (isObjectImmutable(defaultValue)) {
+      return createFieldSetter(field, (old, d) -> defaultValue);
+    } else if (defaultValue instanceof Utf8) {
+      return createFieldSetter(field, reusingReader((old, d) -> readUtf8(old, (Utf8) defaultValue)));
+    } else if (defaultValue instanceof List && ((List<?>) defaultValue).isEmpty()) {
+      return createFieldSetter(field, reusingReader((old, d) -> data.newArray(old, 0, field.schema())));
+    } else if (defaultValue instanceof Map && ((Map<?, ?>) defaultValue).isEmpty()) {
+      return createFieldSetter(field, reusingReader((old, d) -> data.newMap(old, 0)));
+    } else {
+      DatumReader<Object> datumReader = createDatumReader(field.schema());
+      byte[] encoded = getEncodedValue(field);
+      FieldReader fieldReader = reusingReader(
+          (old, decoder) -> datumReader.read(old, DecoderFactory.get().binaryDecoder(encoded, null)));
+      return createFieldSetter(field, fieldReader);
+    }
+  }
+
+  private boolean isObjectImmutable(Object object) {
+    return object == null || object instanceof Number || object instanceof String || object instanceof GenericEnumSymbol
+        || object.getClass().isEnum();
+  }
+
+  private Utf8 readUtf8(Object reuse, Utf8 newValue) {
+    if (reuse instanceof Utf8) {
+      Utf8 oldUtf8 = (Utf8) reuse;
+      oldUtf8.set(newValue);
+      return oldUtf8;
+    } else {
+      return new Utf8(newValue);
+    }
+  }
+
+  private byte[] getEncodedValue(Field field) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+
+    ResolvingGrammarGenerator.encode(encoder, field.schema(), Accessor.defaultValue(field));
+    encoder.flush();
+
+    return out.toByteArray();
+  }
+
+  private IntFunction<Conversion<?>> getConversionSupplier(Object record) {
+    if (record instanceof SpecificRecordBase) {
+      return ((SpecificRecordBase) record)::getConversion;
+    } else {
+      return index -> null;
+    }
+  }
+
+  private RecordReader getRecordReaderFromCache(Schema readerSchema, Schema writerSchema) {
+    return readerCache.computeIfAbsent(readerSchema, k -> new WeakIdentityHashMap<>()).computeIfAbsent(writerSchema,
+        k -> new RecordReader());
+  }
+
+  private FieldReader applyConversions(Schema readerSchema, FieldReader reader, Conversion<?> explicitConversion) {
+    Conversion<?> conversion = explicitConversion;
+
+    if (conversion == null) {
+      if (readerSchema.getLogicalType() == null) {
+        return reader;
+      }
+      conversion = data.getConversionFor(readerSchema.getLogicalType());
+      if (conversion == null) {
+        return reader;
+      }
+    }
+
+    Conversion<?> finalConversion = conversion;
+    return (old, decoder) -> Conversions.convertToLogicalType(reader.read(old, decoder), readerSchema,
+        readerSchema.getLogicalType(), finalConversion);
+  }
+
+  private FieldReader getNonConvertedReader(Action action) throws IOException {
+    switch (action.type) {
+    case CONTAINER:
+      switch (action.reader.getType()) {
+      case MAP:
+        return createMapReader(action.reader, (Container) action);
+      case ARRAY:
+        return createArrayReader(action.reader, (Container) action);
+      default:
+        throw new IllegalStateException("Error getting reader for action type " + action.getClass());
+      }
+    case DO_NOTHING:
+      return getReaderForBaseType(action.reader, action.writer);
+    case RECORD:
+      return createRecordReader((RecordAdjust) action);
+    case ENUM:
+      return createEnumReader((EnumAdjust) action);
+    case PROMOTE:
+      return createPromotingReader((Promote) action);
+    case WRITER_UNION:
+      return createUnionReader((WriterUnion) action);
+    case READER_UNION:
+      return getReaderFor(((ReaderUnion) action).actualAction, null);
+    case ERROR:
+      return (old, decoder) -> {
+        throw new AvroTypeException(action.toString());
+      };
+    default:
+      throw new IllegalStateException("Error getting reader for action type " + action.getClass());
+    }
+  }
+
+  private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchema) throws IOException {
+    switch (readerSchema.getType()) {
+    case NULL:
+      return (old, decoder) -> {
+        decoder.readNull();
+        return null;
+      };
+    case BOOLEAN:
+      return (old, decoder) -> decoder.readBoolean();
+    case STRING:
+      return createStringReader(readerSchema, writerSchema);
+    case INT:
+      return (old, decoder) -> decoder.readInt();
+    case LONG:
+      return (old, decoder) -> decoder.readLong();
+    case FLOAT:
+      return (old, decoder) -> decoder.readFloat();
+    case DOUBLE:
+      return (old, decoder) -> decoder.readDouble();
+    case BYTES:
+      return createBytesReader();
+    case FIXED:
+      return createFixedReader(readerSchema, writerSchema);
+    case RECORD: // covered by action type
+    case UNION: // covered by action type
+    case ENUM: // covered by action type
+    case MAP: // covered by action type
+    case ARRAY: // covered by action type
+    default:
+      throw new IllegalStateException("Error getting reader for type " + readerSchema.getFullName());
+    }
+  }
+
+  private FieldReader createPromotingReader(Promote promote) throws IOException {
+    switch (promote.reader.getType()) {
+    case BYTES:
+      return (reuse, decoder) -> ByteBuffer.wrap(decoder.readString(null).getBytes());
+    case STRING:
+      return createBytesPromotingToStringReader(promote.reader);
+    case LONG:
+      return (reuse, decoder) -> (long) decoder.readInt();
+    case FLOAT:
+      switch (promote.writer.getType()) {
+      case INT:
+        return (reuse, decoder) -> (float) decoder.readInt();
+      case LONG:
+        return (reuse, decoder) -> (float) decoder.readLong();
+      default:
+      }
+      break;
+    case DOUBLE:
+      switch (promote.writer.getType()) {
+      case INT:
+        return (reuse, decoder) -> (double) decoder.readInt();
+      case LONG:
+        return (reuse, decoder) -> (double) decoder.readLong();
+      case FLOAT:
+        return (reuse, decoder) -> (double) decoder.readFloat();
+      default:
+      }
+      break;
+    default:
+    }
+    throw new IllegalStateException(
+        "No promotion possible for type " + promote.writer.getType() + " to " + promote.reader.getType());
+  }
+
+  private FieldReader createStringReader(Schema readerSchema, Schema writerSchema) {
+    FieldReader stringReader = createSimpleStringReader(readerSchema);
+    if (isClassPropEnabled()) {
+      return getTransformingStringReader(readerSchema.getProp(SpecificData.CLASS_PROP), stringReader);
+    } else {
+      return stringReader;
+    }
+  }
+
+  private FieldReader createSimpleStringReader(Schema readerSchema) {
+    String stringProperty = readerSchema.getProp(GenericData.STRING_PROP);
+    if (GenericData.StringType.String.name().equals(stringProperty)) {
+      return (old, decoder) -> decoder.readString();
+    } else {
+      return (old, decoder) -> decoder.readString(old instanceof Utf8 ? (Utf8) old : null);
+    }
+  }
+
+  private FieldReader createBytesPromotingToStringReader(Schema readerSchema) {
+    String stringProperty = readerSchema.getProp(GenericData.STRING_PROP);
+    if (GenericData.StringType.String.name().equals(stringProperty)) {
+      return (old, decoder) -> getStringFromByteBuffer(decoder.readBytes(null));
+    } else {
+      return (old, decoder) -> getUtf8FromByteBuffer(old, decoder.readBytes(null));
+    }
+  }
+
+  private String getStringFromByteBuffer(ByteBuffer buffer) {
+    return new String(buffer.array(), buffer.position(), buffer.remaining(), StandardCharsets.UTF_8);
+  }
+
+  private Utf8 getUtf8FromByteBuffer(Object old, ByteBuffer buffer) {
+    return (old instanceof Utf8) ? ((Utf8) old).set(new Utf8(buffer.array())) : new Utf8(buffer.array());
+  }
+
+  private FieldReader createUnionReader(WriterUnion action) throws IOException {
+    FieldReader[] unionReaders = new FieldReader[action.actions.length];
+    for (int i = 0; i < action.actions.length; i++) {
+      unionReaders[i] = getReaderFor(action.actions[i], null);
+    }
+    return createUnionReader(unionReaders);
+  }
+
+  private FieldReader createUnionReader(FieldReader[] unionReaders) {
+    return reusingReader((reuse, decoder) -> {
+      final int selection = decoder.readIndex();
+      return unionReaders[selection].read(null, decoder);
+    });
+
+  }
+
+  private FieldReader createMapReader(Schema readerSchema, Container action) throws IOException {
+    FieldReader keyReader = createMapKeyReader(readerSchema);
+    FieldReader valueReader = getReaderFor(action.elementAction, null);
+    return new MapReader(keyReader, valueReader);
+  }
+
+  private FieldReader createMapKeyReader(Schema readerSchema) {
+    FieldReader stringReader = createSimpleStringReader(readerSchema);
+    if (isKeyClassEnabled()) {
+      return getTransformingStringReader(readerSchema.getProp(SpecificData.KEY_CLASS_PROP),
+          createSimpleStringReader(readerSchema));
+    } else {
+      return stringReader;
+    }
+  }
+
+  private FieldReader getTransformingStringReader(String valueClass, FieldReader stringReader) {
+    if (valueClass == null) {
+      return stringReader;
+    } else {
+      Function<String, ?> transformer = findClass(valueClass)
+          .map(clazz -> ReflectionUtils.getConstructorAsFunction(String.class, clazz)).orElse(null);
+      if (transformer != null) {
+        return (old, decoder) -> transformer.apply((String) stringReader.read(null, decoder));
+      }
+    }
+
+    return stringReader;
+  }
+
+  private Optional<Class<?>> findClass(String clazz) {
+    try {
+      return Optional.of(data.getClassLoader().loadClass(clazz));
+    } catch (ReflectiveOperationException e) {
+      return Optional.empty();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private FieldReader createArrayReader(Schema readerSchema, Container action) throws IOException {
+    FieldReader elementReader = getReaderFor(action.elementAction, null);
+
+    return reusingReader((reuse, decoder) -> {
+      if (reuse instanceof GenericArray) {
+        GenericArray<Object> reuseArray = (GenericArray<Object>) reuse;
+        long l = decoder.readArrayStart();
+        reuseArray.clear();
+
+        while (l > 0) {
+          for (long i = 0; i < l; i++) {
+            reuseArray.add(elementReader.read(reuseArray.peek(), decoder));
+          }
+          l = decoder.arrayNext();
+        }
+        return reuseArray;
+      } else {
+        long l = decoder.readArrayStart();
+        List<Object> array = (reuse instanceof List) ? (List<Object>) reuse
+            : new GenericData.Array<>((int) l, readerSchema);
+        array.clear();
+        while (l > 0) {
+          for (long i = 0; i < l; i++) {
+            array.add(elementReader.read(null, decoder));
+          }
+          l = decoder.arrayNext();
+        }
+        return array;
+      }
+    });
+  }
+
+  private FieldReader createEnumReader(EnumAdjust action) {
+    return reusingReader((reuse, decoder) -> {
+      int index = decoder.readEnum();
+      Object resultObject = action.values[index];
+      if (resultObject == null) {
+        throw new AvroTypeException("No match for " + action.writer.getEnumSymbols().get(index));
+      }
+      return resultObject;
+    });
+  }
+
+  private FieldReader createFixedReader(Schema readerSchema, Schema writerSchema) {
+    return reusingReader((reuse, decoder) -> {
+      GenericFixed fixed = (GenericFixed) data.createFixed(reuse, readerSchema);
+      decoder.readFixed(fixed.bytes(), 0, readerSchema.getFixedSize());
+      return fixed;
+    });
+  }
+
+  private FieldReader createBytesReader() {
+    return reusingReader(
+        (reuse, decoder) -> decoder.readBytes(reuse instanceof ByteBuffer ? (ByteBuffer) reuse : null));
+  }
+
+  public static FieldReader reusingReader(ReusingFieldReader reader) {
+    return reader;
+  }
+
+  public interface FieldReader extends DatumReader<Object> {
+    @Override
+    public Object read(Object reuse, Decoder decoder) throws IOException;
+
+    public default boolean canReuse() {
+      return false;
+    }
+
+    @Override
+    default void setSchema(Schema schema) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public interface ReusingFieldReader extends FieldReader {
+    @Override
+    public default boolean canReuse() {
+      return true;
+    }
+  }
+
+  public static class RecordReader implements FieldReader {
+    public enum Stage {
+      NEW, INITIALIZING, INITIALIZED
+    }
+
+    private ExecutionStep[] readSteps;
+    private InstanceSupplier supplier;
+    private Schema schema;
+    private Stage stage = Stage.NEW;
+
+    public Stage getInitializationStage() {
+      return this.stage;
+    }
+
+    public void reset() {
+      this.stage = Stage.NEW;
+    }
+
+    public void startInitialization() {
+      this.stage = Stage.INITIALIZING;
+    }
+
+    public void finishInitialization(ExecutionStep[] readSteps, Schema schema, InstanceSupplier supp) {
+      this.readSteps = readSteps;
+      this.schema = schema;
+      this.supplier = supp;
+      this.stage = Stage.INITIALIZED;
+    }
+
+    @Override
+    public boolean canReuse() {
+      return true;
+    }
+
+    @Override
+    public Object read(Object reuse, Decoder decoder) throws IOException {
+      Object object = supplier.newInstance(reuse, schema);
+      for (ExecutionStep thisStep : readSteps) {
+        thisStep.execute(object, decoder);
+      }
+      return object;
+    }
+  }
+
+  public static class MapReader implements FieldReader {
+
+    private final FieldReader keyReader;
+    private final FieldReader valueReader;
+
+    public MapReader(FieldReader keyReader, FieldReader valueReader) {
+      this.keyReader = keyReader;
+      this.valueReader = valueReader;
+    }
+
+    @Override
+    public Object read(Object reuse, Decoder decoder) throws IOException {
+      long l = decoder.readMapStart();
+      Map<Object, Object> targetMap = new HashMap<>();
+
+      while (l > 0) {
+        for (int i = 0; i < l; i++) {
+          Object key = keyReader.read(null, decoder);
+          Object value = valueReader.read(null, decoder);
+          targetMap.put(key, value);
+        }
+        l = decoder.mapNext();
+      }
+
+      return targetMap;
+    }
+  }
+
+  public interface ExecutionStep {
+    public void execute(Object record, Decoder decoder) throws IOException;
+  }
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java b/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java
new file mode 100644
index 0000000..53faec0
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.lang.invoke.CallSite;
+import java.lang.invoke.LambdaMetafactory;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class ReflectionUtils {
+
+  private ReflectionUtils() {
+    // static helper class, don't initiate
+  }
+
+  public static <D> Supplier<D> getConstructorAsSupplier(Class<D> clazz) {
+    try {
+      MethodHandles.Lookup lookup = MethodHandles.lookup();
+      MethodHandle constructorHandle = lookup.findConstructor(clazz, MethodType.methodType(void.class));
+
+      CallSite site = LambdaMetafactory.metafactory(lookup, "get", MethodType.methodType(Supplier.class),
+          constructorHandle.type().generic(), constructorHandle, constructorHandle.type());
+
+      return (Supplier<D>) site.getTarget().invokeExact();
+    } catch (Throwable t) {
+      // if anything goes wrong, don't provide a Supplier
+      return null;
+    }
+  }
+
+  public static <V, R> Supplier<R> getOneArgConstructorAsSupplier(Class<R> clazz, Class<V> argumentClass, V argument) {
+    Function<V, R> supplierFunction = getConstructorAsFunction(argumentClass, clazz);
+    if (supplierFunction != null) {
+      return () -> supplierFunction.apply(argument);
+    } else {
+      return null;
+    }
+  }
+
+  public static <V, R> Function<V, R> getConstructorAsFunction(Class<V> parameterClass, Class<R> clazz) {
+    try {
+      MethodHandles.Lookup lookup = MethodHandles.lookup();
+      MethodHandle constructorHandle = lookup.findConstructor(clazz, MethodType.methodType(void.class, parameterClass));
+
+      CallSite site = LambdaMetafactory.metafactory(lookup, "apply", MethodType.methodType(Function.class),
+          constructorHandle.type().generic(), constructorHandle, constructorHandle.type());
+
+      return (Function<V, R>) site.getTarget().invokeExact();
+    } catch (Throwable t) {
+      // if something goes wrong, do not provide a Function instance
+      return null;
+    }
+  }
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
index fd02b46..1e2fba4 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
@@ -251,7 +251,7 @@ public class ResolvingGrammarGenerator extends ValidatingGrammarGenerator {
    * @param n The Json node to encode.
    * @throws IOException
    */
-  static void encode(Encoder e, Schema s, JsonNode n) throws IOException {
+  public static void encode(Encoder e, Schema s, JsonNode n) throws IOException {
     switch (s.getType()) {
     case RECORD:
       for (Field f : s.getFields()) {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
index 98667cd..bebdfc6 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
@@ -117,7 +117,7 @@ public class SpecificData extends GenericData {
 
   @Override
   public DatumReader createDatumReader(Schema schema) {
-    return new SpecificDatumReader(schema, schema, this);
+    return createDatumReader(schema, schema);
   }
 
   @Override
@@ -491,10 +491,36 @@ public class SpecificData extends GenericData {
     return (c.isInstance(old) ? old : newInstance(c, schema));
   }
 
+  @SuppressWarnings("rawtypes")
+  @Override
+  /**
+   * Create an InstanceSupplier that caches all information required for the
+   * creation of a schema record instance rather than having to look them up for
+   * each call (as newRecord would)
+   */
+  public InstanceSupplier getNewRecordSupplier(Schema schema) {
+    Class c = getClass(schema);
+    if (c == null) {
+      return super.getNewRecordSupplier(schema);
+    }
+
+    boolean useSchema = SchemaConstructable.class.isAssignableFrom(c);
+    Constructor meth = (Constructor) CTOR_CACHE.get(c);
+    Object[] params = useSchema ? new Object[] { schema } : (Object[]) null;
+
+    return (old, sch) -> {
+      try {
+        return c.isInstance(old) ? old : meth.newInstance(params);
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
   /**
    * Tag interface that indicates that a class has a one-argument constructor that
    * accepts a Schema.
-   * 
+   *
    * @see #newInstance
    */
   public interface SchemaConstructable {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
index e0ec85c..883452c 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
@@ -80,7 +80,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
 
   /**
    * Return length in bytes.
-   * 
+   *
    * @deprecated call {@link #getByteLength()} instead.
    */
   @Deprecated
@@ -96,7 +96,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
   /**
    * Set length in bytes. Should called whenever byte content changes, even if the
    * length does not change, as this also clears the cached String.
-   * 
+   *
    * @deprecated call {@link #setByteLength(int)} instead.
    */
   @Deprecated
@@ -130,6 +130,16 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
     return this;
   }
 
+  public Utf8 set(Utf8 other) {
+    if (this.bytes.length < other.length) {
+      this.bytes = new byte[other.length];
+    }
+    this.length = other.length;
+    System.arraycopy(other.bytes, 0, bytes, 0, length);
+    this.string = other.string;
+    return this;
+  }
+
   @Override
   public String toString() {
     if (this.length == 0)