You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/08 16:02:24 UTC
[2/3] incubator-beam git commit: Swap to use Serializable ThreadLocal
and serializable Schema holder.
Swap to use Serializable ThreadLocal and serializable Schema holder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fae3ec52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fae3ec52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fae3ec52
Branch: refs/heads/master
Commit: fae3ec52970f90a0174b789e99022d3095baf2a4
Parents: 06c1846
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 31 11:38:45 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:57 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 178 ++++++++++---------
.../apache/beam/sdk/coders/AvroCoderTest.java | 45 +++--
2 files changed, 127 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 4f0239e..591f145 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -21,9 +21,13 @@ import static org.apache.beam.sdk.util.Structs.addString;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectStreamException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
@@ -163,28 +167,104 @@ public class AvroCoder<T> extends StandardCoder<T> {
};
private final Class<T> type;
- private transient Schema schema;
-
- private final String schemaStr;
+ private final SerializableSchemaSupplier schemaSupplier;
private final List<String> nonDeterministicReasons;
// Factories allocated by .get() are thread-safe and immutable.
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
+
+ /**
+ * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
+ * for Kryo to serialize an {@link AvroCoder} as a final field.
+ */
+ private static class EmptyOnDeserializationThreadLocal<T>
+ extends ThreadLocal<T> implements Serializable {
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ }
+
+ private void readObjectNoData() throws ObjectStreamException {
+ }
+ }
+
+ /**
+ * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}.
+ * This is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage
+ * of the {@link #readResolve} method.
+ */
+ private static class SerializableSchemaString implements Serializable {
+ private final String schema;
+ private SerializableSchemaString(String schema) {
+ this.schema = schema;
+ }
+
+ private Object readResolve() throws IOException, ClassNotFoundException {
+ return new SerializableSchemaSupplier(Schema.parse(schema));
+ }
+ }
+
+ /**
+ * A {@link Serializable} object that delegates to the {@link SerializableSchemaString} via
+ * {@link Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize
+ * Java's serialization and hence is able to encode the {@link Schema} object directly.
+ */
+ private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
+ @SuppressFBWarnings(justification = "writeReplace makes this object serializable. This is a "
+ + "limitation of FindBugs as discussed here: http://stackoverflow.com/questions/"
+ + "26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern",
+ value = "SE_BAD_FIELD")
+ private final Schema schema;
+ private SerializableSchemaSupplier(Schema schema) {
+ this.schema = schema;
+ }
+
+ private Object writeReplace() {
+ return new SerializableSchemaString(schema.toString());
+ }
+
+ @Override
+ public Schema get() {
+ return schema;
+ }
+ }
+
// Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
// these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
// an inner coder.
- private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
- private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
- private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
- private transient ThreadLocal<DatumReader<T>> memoizedReader;
+ private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder;
+ private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder;
+ private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
+ private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
protected AvroCoder(Class<T> type, Schema schema) {
this.type = type;
- this.schema = schema;
- this.schemaStr = schema.toString();
+ this.schemaSupplier = new SerializableSchemaSupplier(schema);
nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
+
+ // Decoder and Encoder start off null for each thread. They are allocated and potentially
+ // reused inside encode/decode.
+ this.decoder = new EmptyOnDeserializationThreadLocal<>();
+ this.encoder = new EmptyOnDeserializationThreadLocal<>();
+
+ // Reader and writer are allocated once per thread and are "final" for thread-local Coder
+ // instance.
+ this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
+ @Override
+ public DatumReader<T> initialValue() {
+ return createDatumReader();
+ }
+ };
+ this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+ @Override
+ public DatumWriter<T> initialValue() {
+ return createDatumWriter();
+ }
+ };
}
/**
@@ -230,22 +310,20 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Override
public void encode(T value, OutputStream outStream, Context context) throws IOException {
// Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
- ThreadLocal<BinaryEncoder> encoder = getEncoder();
BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
// Save the potentially-new instance for reuse later.
encoder.set(encoderInstance);
- getWriter().get().write(value, encoderInstance);
+ writer.get().write(value, encoderInstance);
// Direct binary encoder does not buffer any data and need not be flushed.
}
@Override
public T decode(InputStream inStream, Context context) throws IOException {
// Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
- ThreadLocal<BinaryDecoder> decoder = getDecoder();
BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
// Save the potentially-new instance for later.
decoder.set(decoderInstance);
- return getReader().get().read(null, decoderInstance);
+ return reader.get().read(null, decoderInstance);
}
@Override
@@ -257,7 +335,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
public CloudObject asCloudObject() {
CloudObject result = super.asCloudObject();
addString(result, "type", type.getName());
- addString(result, "schema", getSchema().toString());
+ addString(result, "schema", schemaSupplier.get().toString());
return result;
}
@@ -283,9 +361,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumReader<T> createDatumReader() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumReader<>(getSchema());
+ return new GenericDatumReader<>(schemaSupplier.get());
} else {
- return new ReflectDatumReader<>(getSchema());
+ return new ReflectDatumReader<>(schemaSupplier.get());
}
}
@@ -298,9 +376,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
@Deprecated
public DatumWriter<T> createDatumWriter() {
if (type.equals(GenericRecord.class)) {
- return new GenericDatumWriter<>(getSchema());
+ return new GenericDatumWriter<>(schemaSupplier.get());
} else {
- return new ReflectDatumWriter<>(getSchema());
+ return new ReflectDatumWriter<>(schemaSupplier.get());
}
}
@@ -308,69 +386,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
* Returns the schema used by this coder.
*/
public Schema getSchema() {
- return getMemoizedSchema();
- }
-
- /**
- * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
- */
- private ThreadLocal<BinaryDecoder> getDecoder() {
- if (memoizedDecoder == null) {
- memoizedDecoder = new ThreadLocal<>();
- }
- return memoizedDecoder;
- }
-
- /**
- * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
- */
- private ThreadLocal<BinaryEncoder> getEncoder() {
- if (memoizedEncoder == null) {
- memoizedEncoder = new ThreadLocal<>();
- }
- return memoizedEncoder;
- }
-
- /**
- * Get the memoized {@link DatumReader}, possibly initializing it lazily.
- */
- private ThreadLocal<DatumReader<T>> getReader() {
- if (memoizedReader == null) {
- memoizedReader = new ThreadLocal<DatumReader<T>>() {
- @Override
- public DatumReader<T> initialValue() {
- return createDatumReader();
- }
- };
- }
- return memoizedReader;
- }
-
- /**
- * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
- */
- private ThreadLocal<DatumWriter<T>> getWriter() {
- if (memoizedWriter == null) {
- memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
- @Override
- public DatumWriter<T> initialValue() {
- return createDatumWriter();
- }
- };
- }
- return memoizedWriter;
- }
-
- /**
- * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
- * AvroCoder#schemaStr}.
- */
- private Schema getMemoizedSchema() {
- if (schema == null) {
- Schema.Parser parser = new Schema.Parser();
- schema = parser.parse(schemaStr);
- }
- return schema;
+ return schemaSupplier.get();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae3ec52/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f2373d1..adfa0d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
@@ -39,10 +42,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -78,6 +77,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
/** Tests for {@link AvroCoder}. */
@RunWith(JUnit4.class)
@@ -189,20 +189,35 @@ public class AvroCoderTest {
//Kryo instantiation
Kryo kryo = new Kryo();
- kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
- //Serialization of object
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, coder);
- output.close();
+ //Serialization of object without any memoization
+ ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
+ try (Output output = new Output(coderWithoutMemoizationBos)) {
+ kryo.writeObject(output, coder);
+ }
- //De-serialization of object
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- Input input = new Input(bis);
- AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class);
+ // Force thread local memoization to store values.
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
- CoderProperties.coderDecodeEncodeEqual(copied, value);
+ // Serialization of object with memoized fields
+ ByteArrayOutputStream coderWithMemoizationBos = new ByteArrayOutputStream();
+ try (Output output = new Output(coderWithMemoizationBos)) {
+ kryo.writeObject(output, coder);
+ }
+
+ // Copy empty and memoized variants of the Coder
+ ByteArrayInputStream bisWithoutMemoization =
+ new ByteArrayInputStream(coderWithoutMemoizationBos.toByteArray());
+ AvroCoder<Pojo> copiedWithoutMemoization =
+ (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithoutMemoization), AvroCoder.class);
+ ByteArrayInputStream bisWithMemoization =
+ new ByteArrayInputStream(coderWithMemoizationBos.toByteArray());
+ AvroCoder<Pojo> copiedWithMemoization =
+ (AvroCoder<Pojo>) kryo.readObject(new Input(bisWithMemoization), AvroCoder.class);
+
+ CoderProperties.coderDecodeEncodeEqual(copiedWithoutMemoization, value);
+ CoderProperties.coderDecodeEncodeEqual(copiedWithMemoization, value);
}
@Test