You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/08 16:02:23 UTC

[1/3] incubator-beam git commit: Changes in AvroCoder serialization so it can serialize in Kryo

Repository: incubator-beam
Updated Branches:
  refs/heads/master afa0c31bd -> bfc527d63


Changes in AvroCoder serialization so it can serialize in Kryo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468

Branch: refs/heads/master
Commit: 06c1846860176cc2bd971f8ad7037c97594af866
Parents: afa0c31
Author: Aviem Zur <av...@gmail.com>
Authored: Thu Sep 8 11:21:41 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:34 2016 -0800

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |   7 ++
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 126 +++++++++++--------
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  33 +++++
 3 files changed, 112 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 17ef193..c7b46d8 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -473,5 +473,12 @@
       <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.21</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 7894d14..4f0239e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -164,7 +163,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   };
 
   private final Class<T> type;
-  private final transient Schema schema;
+  private transient Schema schema;
+
+  private final String schemaStr;
 
   private final List<String> nonDeterministicReasons;
 
@@ -174,36 +175,16 @@ public class AvroCoder<T> extends StandardCoder<T> {
   // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
   // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
   // an inner coder.
-  private final transient ThreadLocal<BinaryDecoder> decoder;
-  private final transient ThreadLocal<BinaryEncoder> encoder;
-  private final transient ThreadLocal<DatumWriter<T>> writer;
-  private final transient ThreadLocal<DatumReader<T>> reader;
+  private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
+  private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
+  private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
+  private transient ThreadLocal<DatumReader<T>> memoizedReader;
 
   protected AvroCoder(Class<T> type, Schema schema) {
     this.type = type;
     this.schema = schema;
-
+    this.schemaStr = schema.toString();
     nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
-
-    // Decoder and Encoder start off null for each thread. They are allocated and potentially
-    // reused inside encode/decode.
-    this.decoder = new ThreadLocal<>();
-    this.encoder = new ThreadLocal<>();
-
-    // Reader and writer are allocated once per thread and are "final" for thread-local Coder
-    // instance.
-    this.reader = new ThreadLocal<DatumReader<T>>() {
-      @Override
-      public DatumReader<T> initialValue() {
-        return createDatumReader();
-      }
-    };
-    this.writer = new ThreadLocal<DatumWriter<T>>() {
-      @Override
-      public DatumWriter<T> initialValue() {
-        return createDatumWriter();
-      }
-    };
   }
 
   /**
@@ -246,33 +227,29 @@ public class AvroCoder<T> extends StandardCoder<T> {
     return type;
   }
 
-  private Object writeReplace() {
-    // When serialized by Java, instances of AvroCoder should be replaced by
-    // a SerializedAvroCoderProxy.
-    return new SerializedAvroCoderProxy<>(type, schema.toString());
-  }
-
   @Override
   public void encode(T value, OutputStream outStream, Context context) throws IOException {
     // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
+    ThreadLocal<BinaryEncoder> encoder = getEncoder();
     BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
     // Save the potentially-new instance for reuse later.
     encoder.set(encoderInstance);
-    writer.get().write(value, encoderInstance);
+    getWriter().get().write(value, encoderInstance);
     // Direct binary encoder does not buffer any data and need not be flushed.
   }
 
   @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
+    ThreadLocal<BinaryDecoder> decoder = getDecoder();
     BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
     // Save the potentially-new instance for later.
     decoder.set(decoderInstance);
-    return reader.get().read(null, decoderInstance);
+    return getReader().get().read(null, decoderInstance);
   }
 
   @Override
-    public List<? extends Coder<?>> getCoderArguments() {
+  public List<? extends Coder<?>> getCoderArguments() {
     return null;
   }
 
@@ -280,7 +257,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
   public CloudObject asCloudObject() {
     CloudObject result = super.asCloudObject();
     addString(result, "type", type.getName());
-    addString(result, "schema", schema.toString());
+    addString(result, "schema", getSchema().toString());
     return result;
   }
 
@@ -306,9 +283,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumReader<T> createDatumReader() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumReader<>(schema);
+      return new GenericDatumReader<>(getSchema());
     } else {
-      return new ReflectDatumReader<>(schema);
+      return new ReflectDatumReader<>(getSchema());
     }
   }
 
@@ -321,9 +298,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumWriter<T> createDatumWriter() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumWriter<>(schema);
+      return new GenericDatumWriter<>(getSchema());
     } else {
-      return new ReflectDatumWriter<>(schema);
+      return new ReflectDatumWriter<>(getSchema());
     }
   }
 
@@ -331,28 +308,69 @@ public class AvroCoder<T> extends StandardCoder<T> {
    * Returns the schema used by this coder.
    */
   public Schema getSchema() {
-    return schema;
+    return getMemoizedSchema();
+  }
+
+  /**
+   * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
+   */
+  private ThreadLocal<BinaryDecoder> getDecoder() {
+    if (memoizedDecoder == null) {
+      memoizedDecoder = new ThreadLocal<>();
+    }
+    return memoizedDecoder;
+  }
+
+  /**
+   * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
+   */
+  private ThreadLocal<BinaryEncoder> getEncoder() {
+    if (memoizedEncoder == null) {
+      memoizedEncoder = new ThreadLocal<>();
+    }
+    return memoizedEncoder;
   }
 
   /**
-   * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields
-   * to remain final.
+   * Get the memoized {@link DatumReader}, possibly initializing it lazily.
    */
-  private static class SerializedAvroCoderProxy<T> implements Serializable {
-    private final Class<T> type;
-    private final String schemaStr;
+  private ThreadLocal<DatumReader<T>> getReader() {
+    if (memoizedReader == null) {
+      memoizedReader = new ThreadLocal<DatumReader<T>>() {
+        @Override
+        public DatumReader<T> initialValue() {
+          return createDatumReader();
+        }
+      };
+    }
+    return memoizedReader;
+  }
 
-    public SerializedAvroCoderProxy(Class<T> type, String schemaStr) {
-      this.type = type;
-      this.schemaStr = schemaStr;
+  /**
+   * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
+   */
+  private ThreadLocal<DatumWriter<T>> getWriter() {
+    if (memoizedWriter == null) {
+      memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
+        @Override
+        public DatumWriter<T> initialValue() {
+          return createDatumWriter();
+        }
+      };
     }
+    return memoizedWriter;
+  }
 
-    private Object readResolve() {
-      // When deserialized, instances of this object should be replaced by
-      // constructing an AvroCoder.
+  /**
+   * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
+   * AvroCoder#schemaStr}.
+   */
+  private Schema getMemoizedSchema() {
+    if (schema == null) {
       Schema.Parser parser = new Schema.Parser();
-      return new AvroCoder<T>(type, parser.parse(schemaStr));
+      schema = parser.parse(schemaStr);
     }
+    return schema;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f6329a0..f2373d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -39,6 +39,10 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -172,6 +176,35 @@ public class AvroCoderTest {
     CoderProperties.coderDecodeEncodeEqual(copied, value);
   }
 
+  /**
+   * Confirm that we can serialize and deserialize an AvroCoder object using Kryo.
+   * (BEAM-626).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testKryoSerialization() throws Exception {
+    Pojo value = new Pojo("Hello", 42);
+    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+
+    //Kryo instantiation
+    Kryo kryo = new Kryo();
+    kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+
+    //Serialization of object
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Output output = new Output(bos);
+    kryo.writeObject(output, coder);
+    output.close();
+
+    //De-serialization of object
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    Input input = new Input(bis);
+    AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class);
+
+    CoderProperties.coderDecodeEncodeEqual(copied, value);
+  }
+
   @Test
   public void testPojoEncoding() throws Exception {
     Pojo value = new Pojo("Hello", 42);


[3/3] incubator-beam git commit: [BEAM-626] Changes in AvroCoder serialization so it can serialize in Kryo

Posted by lc...@apache.org.
[BEAM-626] Changes in AvroCoder serialization so it can serialize in Kryo

This closes #1246


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfc527d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfc527d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfc527d6

Branch: refs/heads/master
Commit: bfc527d6355e2dfa489658a337f3ae45ade99cb9
Parents: afa0c31 fae3ec5
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 8 07:49:13 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:49:13 2016 -0800

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |   7 ++
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 124 ++++++++++++-------
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  48 +++++++
 3 files changed, 134 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Swap to use Serializable ThreadLocal and serializable Schema holder.

Posted by lc...@apache.org.
Swap to use Serializable ThreadLocal and serializable Schema holder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fae3ec52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fae3ec52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fae3ec52

Branch: refs/heads/master
Commit: fae3ec52970f90a0174b789e99022d3095baf2a4
Parents: 06c1846
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 31 11:38:45 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:57 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 178 ++++++++++---------
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  45 +++--
 2 files changed, 127 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 4f0239e..591f145 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -21,9 +21,13 @@ import static org.apache.beam.sdk.util.Structs.addString;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectStreamException;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -163,28 +167,104 @@ public class AvroCoder<T> extends StandardCoder<T> {
   };
 
   private final Class<T> type;
-  private transient Schema schema;
-
-  private final String schemaStr;
+  private final SerializableSchemaSupplier schemaSupplier;
 
   private final List<String> nonDeterministicReasons;
 
   // Factories allocated by .get() are thread-safe and immutable.
   private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
   private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
+
+  /**
+   * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
+   * for Kryo to serialize an {@link AvroCoder} as a final field.
+   */
+  private static class EmptyOnDeserializationThreadLocal<T>
+      extends ThreadLocal<T> implements Serializable {
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+    }
+
+    private void readObjectNoData() throws ObjectStreamException {
+    }
+  }
+
+  /**
+   * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}.
+   * This is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage
+   * of the {@link #readResolve} method.
+   */
+  private static class SerializableSchemaString implements Serializable {
+    private final String schema;
+    private SerializableSchemaString(String schema) {
+      this.schema = schema;
+    }
+
+    private Object readResolve() throws IOException, ClassNotFoundException {
+      return new SerializableSchemaSupplier(Schema.parse(schema));
+    }
+  }
+
+  /**
+   * A {@link Serializable} object that delegates to the {@link SerializableSchemaString} via
+   * {@link Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize
+   * Java's serialization and hence is able to encode the {@link Schema} object directly.
+   */
+  private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
+    @SuppressFBWarnings(justification = "writeReplace makes this object serializable. This is a "
+        + "limitation of FindBugs as discussed here: http://stackoverflow.com/questions/"
+        + "26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern",
+        value = "SE_BAD_FIELD")
+    private final Schema schema;
+    private SerializableSchemaSupplier(Schema schema) {
+      this.schema = schema;
+    }
+
+    private Object writeReplace() {
+      return new SerializableSchemaString(schema.toString());
+    }
+
+    @Override
+    public Schema get() {
+      return schema;
+    }
+  }
+
   // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
   // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
   // an inner coder.
-  private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
-  private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
-  private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
-  private transient ThreadLocal<DatumReader<T>> memoizedReader;
+  private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder;
+  private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder;
+  private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
+  private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
 
   protected AvroCoder(Class<T> type, Schema schema) {
     this.type = type;
-    this.schema = schema;
-    this.schemaStr = schema.toString();
+    this.schemaSupplier = new SerializableSchemaSupplier(schema);
     nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
+
+    // Decoder and Encoder start off null for each thread. They are allocated and potentially
+    // reused inside encode/decode.
+    this.decoder = new EmptyOnDeserializationThreadLocal<>();
+    this.encoder = new EmptyOnDeserializationThreadLocal<>();
+
+    // Reader and writer are allocated once per thread and are "final" for thread-local Coder
+    // instance.
+    this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
+      @Override
+      public DatumReader<T> initialValue() {
+        return createDatumReader();
+      }
+    };
+    this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+      @Override
+      public DatumWriter<T> initialValue() {
+        return createDatumWriter();
+      }
+    };
   }
 
   /**
@@ -230,22 +310,20 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Override
   public void encode(T value, OutputStream outStream, Context context) throws IOException {
     // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
-    ThreadLocal<BinaryEncoder> encoder = getEncoder();
     BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
     // Save the potentially-new instance for reuse later.
     encoder.set(encoderInstance);
-    getWriter().get().write(value, encoderInstance);
+    writer.get().write(value, encoderInstance);
     // Direct binary encoder does not buffer any data and need not be flushed.
   }
 
   @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
-    ThreadLocal<BinaryDecoder> decoder = getDecoder();
     BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
     // Save the potentially-new instance for later.
     decoder.set(decoderInstance);
-    return getReader().get().read(null, decoderInstance);
+    return reader.get().read(null, decoderInstance);
   }
 
   @Override
@@ -257,7 +335,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
   public CloudObject asCloudObject() {
     CloudObject result = super.asCloudObject();
     addString(result, "type", type.getName());
-    addString(result, "schema", getSchema().toString());
+    addString(result, "schema", schemaSupplier.get().toString());
     return result;
   }
 
@@ -283,9 +361,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumReader<T> createDatumReader() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumReader<>(getSchema());
+      return new GenericDatumReader<>(schemaSupplier.get());
     } else {
-      return new ReflectDatumReader<>(getSchema());
+      return new ReflectDatumReader<>(schemaSupplier.get());
     }
   }
 
@@ -298,9 +376,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumWriter<T> createDatumWriter() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumWriter<>(getSchema());
+      return new GenericDatumWriter<>(schemaSupplier.get());
     } else {
-      return new ReflectDatumWriter<>(getSchema());
+      return new ReflectDatumWriter<>(schemaSupplier.get());
     }
   }
 
@@ -308,69 +386,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
    * Returns the schema used by this coder.
    */
   public Schema getSchema() {
-    return getMemoizedSchema();
-  }
-
-  /**
-   * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
-   */
-  private ThreadLocal<BinaryDecoder> getDecoder() {
-    if (memoizedDecoder == null) {
-      memoizedDecoder = new ThreadLocal<>();
-    }
-    return memoizedDecoder;
-  }
-
-  /**
-   * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
-   */
-  private ThreadLocal<BinaryEncoder> getEncoder() {
-    if (memoizedEncoder == null) {
-      memoizedEncoder = new ThreadLocal<>();
-    }
-    return memoizedEncoder;
-  }
-
-  /**
-   * Get the memoized {@link DatumReader}, possibly initializing it lazily.
-   */
-  private ThreadLocal<DatumReader<T>> getReader() {
-    if (memoizedReader == null) {
-      memoizedReader = new ThreadLocal<DatumReader<T>>() {
-        @Override
-        public DatumReader<T> initialValue() {
-          return createDatumReader();
-        }
-      };
-    }
-    return memoizedReader;
-  }
-
-  /**
-   * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
-   */
-  private ThreadLocal<DatumWriter<T>> getWriter() {
-    if (memoizedWriter == null) {
-      memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
-        @Override
-        public DatumWriter<T> initialValue() {
-          return createDatumWriter();
-        }
-      };
-    }
-    return memoizedWriter;
-  }
-
-  /**
-   * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
-   * AvroCoder#schemaStr}.
-   */
-  private Schema getMemoizedSchema() {
-    if (schema == null) {
-      Schema.Parser parser = new Schema.Parser();
-      schema = parser.parse(schemaStr);
-    }
-    return schema;
+    return schemaSupplier.get();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f2373d1..adfa0d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
@@ -39,10 +42,6 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -78,6 +77,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 /** Tests for {@link AvroCoder}. */
 @RunWith(JUnit4.class)
@@ -189,20 +189,35 @@ public class AvroCoderTest {
 
     //Kryo instantiation
     Kryo kryo = new Kryo();
-    kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+    kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
 
-    //Serialization of object
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output output = new Output(bos);
-    kryo.writeObject(output, coder);
-    output.close();
+    //Serialization of object without any memoization
+    ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
+    try (Output output = new Output(coderWithoutMemoizationBos)) {
+      kryo.writeObject(output, coder);
+    }
 
-    //De-serialization of object
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    Input input = new Input(bis);
-    AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class);
+    // Force thread local memoization to store values.
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
 
-    CoderProperties.coderDecodeEncodeEqual(copied, value);
+    // Serialization of object with memoized fields
+    ByteArrayOutputStream coderWithMemoizationBos = new ByteArrayOutputStream();
+    try (Output output = new Output(coderWithMemoizationBos)) {
+      kryo.writeObject(output, coder);
+    }
+
+    // Copy empty and memoized variants of the Coder
+    ByteArrayInputStream bisWithoutMemoization =
+        new ByteArrayInputStream(coderWithoutMemoizationBos.toByteArray());
+    AvroCoder<Pojo> copiedWithoutMemoization =
+        (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithoutMemoization), AvroCoder.class);
+    ByteArrayInputStream bisWithMemoization =
+        new ByteArrayInputStream(coderWithMemoizationBos.toByteArray());
+    AvroCoder<Pojo> copiedWithMemoization =
+        (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithMemoization), AvroCoder.class);
+
+    CoderProperties.coderDecodeEncodeEqual(copiedWithoutMemoization, value);
+    CoderProperties.coderDecodeEncodeEqual(copiedWithMemoization, value);
   }
 
   @Test