You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/08 16:02:23 UTC
[1/3] incubator-beam git commit: Changes in AvroCoder serialization
so it can serialize in Kryo
Repository: incubator-beam
Updated Branches:
refs/heads/master afa0c31bd -> bfc527d63
Changes in AvroCoder serialization so it can serialize in Kryo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468
Branch: refs/heads/master
Commit: 06c1846860176cc2bd971f8ad7037c97594af866
Parents: afa0c31
Author: Aviem Zur <av...@gmail.com>
Authored: Thu Sep 8 11:21:41 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:34 2016 -0800
----------------------------------------------------------------------
sdks/java/core/pom.xml | 7 ++
.../org/apache/beam/sdk/coders/AvroCoder.java | 126 +++++++++++--------
.../apache/beam/sdk/coders/AvroCoderTest.java | 33 +++++
3 files changed, 112 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 17ef193..c7b46d8 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -473,5 +473,12 @@
<artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.21</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 7894d14..4f0239e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
@@ -164,7 +163,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
};
private final Class<T> type;
- private final transient Schema schema;
+ private transient Schema schema;
+
+ private final String schemaStr;
private final List<String> nonDeterministicReasons;
@@ -174,36 +175,16 @@ public class AvroCoder<T> extends StandardCoder<T> {
// Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
// these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
// an inner coder.
- private final transient ThreadLocal<BinaryDecoder> decoder;
- private final transient ThreadLocal<BinaryEncoder> encoder;
- private final transient ThreadLocal<DatumWriter<T>> writer;
- private final transient ThreadLocal<DatumReader<T>> reader;
+ private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
+ private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
+ private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
+ private transient ThreadLocal<DatumReader<T>> memoizedReader;
protected AvroCoder(Class<T> type, Schema schema) {
this.type = type;
this.schema = schema;
-
+ this.schemaStr = schema.toString();
nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
-
- // Decoder and Encoder start off null for each thread. They are allocated and potentially
- // reused inside encode/decode.
- this.decoder = new ThreadLocal<>();
- this.encoder = new ThreadLocal<>();
-
- // Reader and writer are allocated once per thread and are "final" for thread-local Coder
- // instance.
- this.reader = new ThreadLocal<DatumReader<T>>() {
- @Override
- public DatumReader<T> initialValue() {
- return createDatumReader();
- }
- };
- this.writer = new ThreadLocal<DatumWriter<T>>() {
- @Override
- public DatumWriter<T> initialValue() {
- return createDatumWriter();
- }
- };
}
/**
@@ -246,33 +227,29 @@ public class AvroCoder<T> extends StandardCoder<T> {
return type;
}
- private Object writeReplace() {
- // When serialized by Java, instances of AvroCoder should be replaced by
- // a SerializedAvroCoderProxy.
- return new SerializedAvroCoderProxy<>(type, schema.toString());
- }
-
@Override
public void encode(T value, OutputStream outStream, Context context) throws IOException {
// Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
+ ThreadLocal<BinaryEncoder> encoder = getEncoder();
BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
// Save the potentially-new instance for reuse later.
encoder.set(encoderInstance);
- writer.get().write(value, encoderInstance);
+ getWriter().get().write(value, encoderInstance);
// Direct binary encoder does not buffer any data and need not be flushed.
}
@Override
public T decode(InputStream inStream, Context context) throws IOException {
// Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
+ ThreadLocal<BinaryDecoder> decoder = getDecoder();
BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
// Save the potentially-new instance for later.
decoder.set(decoderInstance);
- return reader.get().read(null, decoderInstance);
+ return getReader().get().read(null, decoderInstance);
}
@Override
- public List<? extends Coder<?>> getCoderArguments() {
+ public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@@ -280,7 +257,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
public CloudObject asCloudObject() {
CloudObject result = super.asCloudObject();
addString(result, "type", type.getName());
- addString(result, "schema", schema.toString());
+ addString(result, "schema", getSchema().toString());
return result;
}
@@ -306,9 +283,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumReader<T> createDatumReader() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumReader<>(schema);
+ return new GenericDatumReader<>(getSchema());
} else {
- return new ReflectDatumReader<>(schema);
+ return new ReflectDatumReader<>(getSchema());
}
}
@@ -321,9 +298,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumWriter<T> createDatumWriter() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumWriter<>(schema);
+ return new GenericDatumWriter<>(getSchema());
} else {
- return new ReflectDatumWriter<>(schema);
+ return new ReflectDatumWriter<>(getSchema());
}
}
@@ -331,28 +308,69 @@ public class AvroCoder<T> extends StandardCoder<T> {
* Returns the schema used by this coder.
*/
public Schema getSchema() {
- return schema;
+ return getMemoizedSchema();
+ }
+
+ /**
+ * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
+ */
+ private ThreadLocal<BinaryDecoder> getDecoder() {
+ if (memoizedDecoder == null) {
+ memoizedDecoder = new ThreadLocal<>();
+ }
+ return memoizedDecoder;
+ }
+
+ /**
+ * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
+ */
+ private ThreadLocal<BinaryEncoder> getEncoder() {
+ if (memoizedEncoder == null) {
+ memoizedEncoder = new ThreadLocal<>();
+ }
+ return memoizedEncoder;
}
/**
- * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields
- * to remain final.
+ * Get the memoized {@link DatumReader}, possibly initializing it lazily.
*/
- private static class SerializedAvroCoderProxy<T> implements Serializable {
- private final Class<T> type;
- private final String schemaStr;
+ private ThreadLocal<DatumReader<T>> getReader() {
+ if (memoizedReader == null) {
+ memoizedReader = new ThreadLocal<DatumReader<T>>() {
+ @Override
+ public DatumReader<T> initialValue() {
+ return createDatumReader();
+ }
+ };
+ }
+ return memoizedReader;
+ }
- public SerializedAvroCoderProxy(Class<T> type, String schemaStr) {
- this.type = type;
- this.schemaStr = schemaStr;
+ /**
+ * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
+ */
+ private ThreadLocal<DatumWriter<T>> getWriter() {
+ if (memoizedWriter == null) {
+ memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
+ @Override
+ public DatumWriter<T> initialValue() {
+ return createDatumWriter();
+ }
+ };
}
+ return memoizedWriter;
+ }
- private Object readResolve() {
- // When deserialized, instances of this object should be replaced by
- // constructing an AvroCoder.
+ /**
+ * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
+ * AvroCoder#schemaStr}.
+ */
+ private Schema getMemoizedSchema() {
+ if (schema == null) {
Schema.Parser parser = new Schema.Parser();
- return new AvroCoder<T>(type, parser.parse(schemaStr));
+ schema = parser.parse(schemaStr);
}
+ return schema;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f6329a0..f2373d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -39,6 +39,10 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -172,6 +176,35 @@ public class AvroCoderTest {
CoderProperties.coderDecodeEncodeEqual(copied, value);
}
+ /**
+ * Confirm that we can serialize and deserialize an AvroCoder object using Kryo.
+ * (BEAM-626).
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testKryoSerialization() throws Exception {
+ Pojo value = new Pojo("Hello", 42);
+ AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+
+ //Kryo instantiation
+ Kryo kryo = new Kryo();
+ kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+
+ //Serialization of object
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output output = new Output(bos);
+ kryo.writeObject(output, coder);
+ output.close();
+
+ //De-serialization of object
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ Input input = new Input(bis);
+ AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class);
+
+ CoderProperties.coderDecodeEncodeEqual(copied, value);
+ }
+
@Test
public void testPojoEncoding() throws Exception {
Pojo value = new Pojo("Hello", 42);
[3/3] incubator-beam git commit: [BEAM-626] Changes in AvroCoder
serialization so it can serialize in Kryo
Posted by lc...@apache.org.
[BEAM-626] Changes in AvroCoder serialization so it can serialize in Kryo
This closes #1246
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfc527d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfc527d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfc527d6
Branch: refs/heads/master
Commit: bfc527d6355e2dfa489658a337f3ae45ade99cb9
Parents: afa0c31 fae3ec5
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 8 07:49:13 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:49:13 2016 -0800
----------------------------------------------------------------------
sdks/java/core/pom.xml | 7 ++
.../org/apache/beam/sdk/coders/AvroCoder.java | 124 ++++++++++++-------
.../apache/beam/sdk/coders/AvroCoderTest.java | 48 +++++++
3 files changed, 134 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Swap to use Serializable ThreadLocal
and serializable Schema holder.
Posted by lc...@apache.org.
Swap to use Serializable ThreadLocal and serializable Schema holder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fae3ec52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fae3ec52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fae3ec52
Branch: refs/heads/master
Commit: fae3ec52970f90a0174b789e99022d3095baf2a4
Parents: 06c1846
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 31 11:38:45 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:57 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 178 ++++++++++---------
.../apache/beam/sdk/coders/AvroCoderTest.java | 45 +++--
2 files changed, 127 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 4f0239e..591f145 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -21,9 +21,13 @@ import static org.apache.beam.sdk.util.Structs.addString;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectStreamException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
@@ -163,28 +167,104 @@ public class AvroCoder<T> extends StandardCoder<T> {
};
private final Class<T> type;
- private transient Schema schema;
-
- private final String schemaStr;
+ private final SerializableSchemaSupplier schemaSupplier;
private final List<String> nonDeterministicReasons;
// Factories allocated by .get() are thread-safe and immutable.
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
+
+ /**
+ * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
+ * for Kryo to serialize an {@link AvroCoder} as a final field.
+ */
+ private static class EmptyOnDeserializationThreadLocal<T>
+ extends ThreadLocal<T> implements Serializable {
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ }
+
+ private void readObjectNoData() throws ObjectStreamException {
+ }
+ }
+
+ /**
+ * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}.
+ * This is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage
+ * of the {@link #readResolve} method.
+ */
+ private static class SerializableSchemaString implements Serializable {
+ private final String schema;
+ private SerializableSchemaString(String schema) {
+ this.schema = schema;
+ }
+
+ private Object readResolve() throws IOException, ClassNotFoundException {
+ return new SerializableSchemaSupplier(Schema.parse(schema));
+ }
+ }
+
+ /**
+ * A {@link Serializable} object that delegates to the {@link SerializableSchemaString} via
+ * {@link Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize
+ * Java's serialization and hence is able to encode the {@link Schema} object directly.
+ */
+ private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
+ @SuppressFBWarnings(justification = "writeReplace makes this object serializable. This is a "
+ + "limitation of FindBugs as discussed here: http://stackoverflow.com/questions/"
+ + "26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern",
+ value = "SE_BAD_FIELD")
+ private final Schema schema;
+ private SerializableSchemaSupplier(Schema schema) {
+ this.schema = schema;
+ }
+
+ private Object writeReplace() {
+ return new SerializableSchemaString(schema.toString());
+ }
+
+ @Override
+ public Schema get() {
+ return schema;
+ }
+ }
+
// Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
// these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
// an inner coder.
- private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
- private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
- private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
- private transient ThreadLocal<DatumReader<T>> memoizedReader;
+ private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder;
+ private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder;
+ private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
+ private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
protected AvroCoder(Class<T> type, Schema schema) {
this.type = type;
- this.schema = schema;
- this.schemaStr = schema.toString();
+ this.schemaSupplier = new SerializableSchemaSupplier(schema);
nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
+
+ // Decoder and Encoder start off null for each thread. They are allocated and potentially
+ // reused inside encode/decode.
+ this.decoder = new EmptyOnDeserializationThreadLocal<>();
+ this.encoder = new EmptyOnDeserializationThreadLocal<>();
+
+ // Reader and writer are allocated once per thread and are "final" for thread-local Coder
+ // instance.
+ this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
+ @Override
+ public DatumReader<T> initialValue() {
+ return createDatumReader();
+ }
+ };
+ this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+ @Override
+ public DatumWriter<T> initialValue() {
+ return createDatumWriter();
+ }
+ };
}
/**
@@ -230,22 +310,20 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Override
public void encode(T value, OutputStream outStream, Context context) throws IOException {
// Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
- ThreadLocal<BinaryEncoder> encoder = getEncoder();
BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
// Save the potentially-new instance for reuse later.
encoder.set(encoderInstance);
- getWriter().get().write(value, encoderInstance);
+ writer.get().write(value, encoderInstance);
// Direct binary encoder does not buffer any data and need not be flushed.
}
@Override
public T decode(InputStream inStream, Context context) throws IOException {
// Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
- ThreadLocal<BinaryDecoder> decoder = getDecoder();
BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
// Save the potentially-new instance for later.
decoder.set(decoderInstance);
- return getReader().get().read(null, decoderInstance);
+ return reader.get().read(null, decoderInstance);
}
@Override
@@ -257,7 +335,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
public CloudObject asCloudObject() {
CloudObject result = super.asCloudObject();
addString(result, "type", type.getName());
- addString(result, "schema", getSchema().toString());
+ addString(result, "schema", schemaSupplier.get().toString());
return result;
}
@@ -283,9 +361,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumReader<T> createDatumReader() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumReader<>(getSchema());
+ return new GenericDatumReader<>(schemaSupplier.get());
} else {
- return new ReflectDatumReader<>(getSchema());
+ return new ReflectDatumReader<>(schemaSupplier.get());
}
}
@@ -298,9 +376,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumWriter<T> createDatumWriter() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumWriter<>(getSchema());
+ return new GenericDatumWriter<>(schemaSupplier.get());
} else {
- return new ReflectDatumWriter<>(getSchema());
+ return new ReflectDatumWriter<>(schemaSupplier.get());
}
}
@@ -308,69 +386,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
* Returns the schema used by this coder.
*/
public Schema getSchema() {
- return getMemoizedSchema();
- }
-
- /**
- * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
- */
- private ThreadLocal<BinaryDecoder> getDecoder() {
- if (memoizedDecoder == null) {
- memoizedDecoder = new ThreadLocal<>();
- }
- return memoizedDecoder;
- }
-
- /**
- * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
- */
- private ThreadLocal<BinaryEncoder> getEncoder() {
- if (memoizedEncoder == null) {
- memoizedEncoder = new ThreadLocal<>();
- }
- return memoizedEncoder;
- }
-
- /**
- * Get the memoized {@link DatumReader}, possibly initializing it lazily.
- */
- private ThreadLocal<DatumReader<T>> getReader() {
- if (memoizedReader == null) {
- memoizedReader = new ThreadLocal<DatumReader<T>>() {
- @Override
- public DatumReader<T> initialValue() {
- return createDatumReader();
- }
- };
- }
- return memoizedReader;
- }
-
- /**
- * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
- */
- private ThreadLocal<DatumWriter<T>> getWriter() {
- if (memoizedWriter == null) {
- memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
- @Override
- public DatumWriter<T> initialValue() {
- return createDatumWriter();
- }
- };
- }
- return memoizedWriter;
- }
-
- /**
- * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
- * AvroCoder#schemaStr}.
- */
- private Schema getMemoizedSchema() {
- if (schema == null) {
- Schema.Parser parser = new Schema.Parser();
- schema = parser.parse(schemaStr);
- }
- return schema;
+ return schemaSupplier.get();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f2373d1..adfa0d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
@@ -39,10 +42,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -78,6 +77,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
/** Tests for {@link AvroCoder}. */
@RunWith(JUnit4.class)
@@ -189,20 +189,35 @@ public class AvroCoderTest {
//Kryo instantiation
Kryo kryo = new Kryo();
- kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
- //Serialization of object
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, coder);
- output.close();
+ //Serialization of object without any memoization
+ ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
+ try (Output output = new Output(coderWithoutMemoizationBos)) {
+ kryo.writeObject(output, coder);
+ }
- //De-serialization of object
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- Input input = new Input(bis);
- AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class);
+ // Force thread local memoization to store values.
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
- CoderProperties.coderDecodeEncodeEqual(copied, value);
+ // Serialization of object with memoized fields
+ ByteArrayOutputStream coderWithMemoizationBos = new ByteArrayOutputStream();
+ try (Output output = new Output(coderWithMemoizationBos)) {
+ kryo.writeObject(output, coder);
+ }
+
+ // Copy empty and memoized variants of the Coder
+ ByteArrayInputStream bisWithoutMemoization =
+ new ByteArrayInputStream(coderWithoutMemoizationBos.toByteArray());
+ AvroCoder<Pojo> copiedWithoutMemoization =
+ (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithoutMemoization), AvroCoder.class);
+ ByteArrayInputStream bisWithMemoization =
+ new ByteArrayInputStream(coderWithMemoizationBos.toByteArray());
+ AvroCoder<Pojo> copiedWithMemoization =
+ (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithMemoization), AvroCoder.class);
+
+ CoderProperties.coderDecodeEncodeEqual(copiedWithoutMemoization, value);
+ CoderProperties.coderDecodeEncodeEqual(copiedWithMemoization, value);
}
@Test