You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/01 16:35:57 UTC
[1/2] beam git commit: Use a new ReflectData for each AvroCoder
instance
Repository: beam
Updated Branches:
refs/heads/master 535761a74 -> 8a1fab1d1
Use a new ReflectData for each AvroCoder instance
This addresses an issue where Avro might have cached a class
from a different ClassLoader.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e335081
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e335081
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e335081
Branch: refs/heads/master
Commit: 7e33508187f49c359ce585ae173f3aee5658eb35
Parents: 535761a
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Apr 29 14:06:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 09:35:17 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 72 ++++++++++++++------
.../apache/beam/sdk/coders/AvroCoderTest.java | 65 ++++++++++++++++++
.../beam/sdk/coders/AvroCoderTestPojo.java | 51 ++++++++++++++
3 files changed, 167 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 1d7cce5..1e01f1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.coders;
import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -114,7 +115,7 @@ public class AvroCoder<T> extends CustomCoder<T> {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> clazz) {
- return new AvroCoder<>(clazz, ReflectData.get().getSchema(clazz));
+ return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz));
}
/**
@@ -198,6 +199,25 @@ public class AvroCoder<T> extends CustomCoder<T> {
}
}
+ /**
+ * A {@link Serializable} object that lazily supplies a {@link ReflectData} built from the
+ * appropriate {@link ClassLoader} for the type encoded by this {@link AvroCoder}.
+ */
+ private static class SerializableReflectDataSupplier
+ implements Serializable, Supplier<ReflectData> {
+
+ private final Class<?> clazz;
+
+ private SerializableReflectDataSupplier(Class<?> clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public ReflectData get() {
+ return new ReflectData(clazz.getClassLoader());
+ }
+ }
+
// Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
// these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
// an inner coder.
@@ -206,6 +226,9 @@ public class AvroCoder<T> extends CustomCoder<T> {
private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
+ // Lazily re-instantiated after deserialization
+ private final Supplier<ReflectData> reflectData;
+
protected AvroCoder(Class<T> type, Schema schema) {
this.type = type;
this.schemaSupplier = new SerializableSchemaSupplier(schema);
@@ -217,26 +240,33 @@ public class AvroCoder<T> extends CustomCoder<T> {
this.decoder = new EmptyOnDeserializationThreadLocal<>();
this.encoder = new EmptyOnDeserializationThreadLocal<>();
- // Reader and writer are allocated once per thread and are "final" for thread-local Coder
- // instance.
- this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
- private final AvroCoder<T> myCoder = AvroCoder.this;
- @Override
- public DatumReader<T> initialValue() {
- return myCoder.getType().equals(GenericRecord.class)
- ? new GenericDatumReader<T>(myCoder.getSchema())
- : new ReflectDatumReader<T>(myCoder.getSchema());
- }
- };
- this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
- private final AvroCoder<T> myCoder = AvroCoder.this;
- @Override
- public DatumWriter<T> initialValue() {
- return myCoder.getType().equals(GenericRecord.class)
- ? new GenericDatumWriter<T>(myCoder.getSchema())
- : new ReflectDatumWriter<T>(myCoder.getSchema());
- }
- };
+ this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType()));
+
+ // Reader and writer are allocated once per thread per Coder
+ this.reader =
+ new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
+ private final AvroCoder<T> myCoder = AvroCoder.this;
+
+ @Override
+ public DatumReader<T> initialValue() {
+ return myCoder.getType().equals(GenericRecord.class)
+ ? new GenericDatumReader<T>(myCoder.getSchema())
+ : new ReflectDatumReader<T>(
+ myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get());
+ }
+ };
+
+ this.writer =
+ new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+ private final AvroCoder<T> myCoder = AvroCoder.this;
+
+ @Override
+ public DatumWriter<T> initialValue() {
+ return myCoder.getType().equals(GenericRecord.class)
+ ? new GenericDatumWriter<T>(myCoder.getSchema())
+ : new ReflectDatumWriter<T>(myCoder.getSchema(), myCoder.reflectData.get());
+ }
+ };
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index cbc4d24..e1d5359 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -26,8 +26,10 @@ import static org.junit.Assert.fail;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
@@ -64,6 +66,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -159,8 +163,69 @@ public class AvroCoderTest {
CoderProperties.coderConsistentWithEquals(copy, pojo, equalPojo);
CoderProperties.coderConsistentWithEquals(coder, pojo, otherPojo);
CoderProperties.coderConsistentWithEquals(copy, pojo, otherPojo);
+ }
+
+ /**
+ * A classloader that intercepts loading of Pojo and makes a new one.
+ */
+ private static class InterceptingUrlClassLoader extends ClassLoader {
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ if (name.equals(AvroCoderTestPojo.class.getName())) {
+ // Quite a hack?
+ try {
+ String classAsResource = name.replace('.', '/') + ".class";
+ byte[] classBytes =
+ ByteStreams.toByteArray(getParent().getResourceAsStream(classAsResource));
+ return defineClass(name, classBytes, 0, classBytes.length);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return getParent().loadClass(name);
+ }
+ }
+ }
+
+ /**
+ * Tests that {@link AvroCoder} works around issues in Avro where cache classes might be
+ * from the wrong ClassLoader, causing confusing "Cannot cast X to X" error messages.
+ */
+ @Test
+ public void testTwoClassLoaders() throws Exception {
+ ClassLoader loader1 = new InterceptingUrlClassLoader();
+ ClassLoader loader2 = new InterceptingUrlClassLoader();
+
+ Class<?> pojoClass1 = loader1.loadClass(AvroCoderTestPojo.class.getName());
+ Class<?> pojoClass2 = loader2.loadClass(AvroCoderTestPojo.class.getName());
+
+ Object pojo1 = InstanceBuilder.ofType(pojoClass1).withArg(String.class, "hello").build();
+ Object pojo2 = InstanceBuilder.ofType(pojoClass2).withArg(String.class, "goodbye").build();
+
+ // Confirm incompatibility
+ try {
+ pojoClass2.cast(pojo1);
+ fail("Expected ClassCastException; without it, this test is vacuous");
+ } catch (ClassCastException e) {
+ // g2g
+ }
+
+ // The first coder is expected to populate the Avro SpecificData cache
+ // The second coder is expected to be corrupted if the caching is done wrong.
+ AvroCoder<Object> avroCoder1 = (AvroCoder) AvroCoder.of(pojoClass1);
+ AvroCoder<Object> avroCoder2 = (AvroCoder) AvroCoder.of(pojoClass2);
+
+ Object cloned1 = CoderUtils.clone(avroCoder1, pojo1);
+ Object cloned2 = CoderUtils.clone(avroCoder2, pojo2);
+
+ Class<?> class1 = cloned1.getClass();
+ Class<?> class2 = cloned2.getClass();
+ // Confirming that the uncorrupted coder is fine
+ pojoClass1.cast(cloned1);
+ // Confirmed to fail prior to the fix
+ pojoClass2.cast(cloned2);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java
new file mode 100644
index 0000000..dd5d419
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
+/** A Pojo at the top level for use in tests. */
+class AvroCoderTestPojo {
+
+ public String text;
+
+ // Empty constructor required for Avro decoding.
+ @SuppressWarnings("unused")
+ public AvroCoderTestPojo() {
+ }
+
+ public AvroCoderTestPojo(String text) {
+ this.text = text;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof AvroCoderTestPojo) && ((AvroCoderTestPojo) other).text.equals(text);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(AvroCoderTestPojo.class, text);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("text", text).toString();
+ }
+}
[2/2] beam git commit: [BEAM-1970] Use a new ReflectData for each
AvroCoder instance
Posted by lc...@apache.org.
[BEAM-1970] Use a new ReflectData for each AvroCoder instance
This closes #2783
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a1fab1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a1fab1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a1fab1d
Branch: refs/heads/master
Commit: 8a1fab1d12ab49c5b0287ea8aa8f93fdf36fe0e4
Parents: 535761a 7e33508
Author: Luke Cwik <lc...@google.com>
Authored: Mon May 1 09:35:46 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 09:35:46 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 72 ++++++++++++++------
.../apache/beam/sdk/coders/AvroCoderTest.java | 65 ++++++++++++++++++
.../beam/sdk/coders/AvroCoderTestPojo.java | 51 ++++++++++++++
3 files changed, 167 insertions(+), 21 deletions(-)
----------------------------------------------------------------------