You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/23 21:02:10 UTC
[1/3] beam git commit: Use a ThreadLocal to for
Marshaller/Unmarshaller in JAXBCoder
Repository: beam
Updated Branches:
refs/heads/master 9db5f746a -> daed01a69
Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder
This allows reuse of thread-unsafe marshallers and unmarshallers while
encoding elements, while the coder remains thread-safe.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf0b990b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf0b990b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf0b990b
Branch: refs/heads/master
Commit: cf0b990b0336f46b4d4775c93e86bd2310d622b5
Parents: e1ee05e
Author: Kai Jiang <ji...@gmail.com>
Authored: Thu Jan 19 05:06:13 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 11:46:48 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/JAXBCoder.java | 36 +++++++++++++++-----
1 file changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cf0b990b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 0a4f9cc..ea636fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -30,6 +30,7 @@ import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,6 +46,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
private final Class<T> jaxbClass;
private final TypeDescriptor<T> typeDescriptor;
private transient volatile JAXBContext jaxbContext;
+ private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
+ private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
public Class<T> getJAXBClass() {
return jaxbClass;
@@ -53,6 +56,28 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
private JAXBCoder(Class<T> jaxbClass) {
this.jaxbClass = jaxbClass;
this.typeDescriptor = TypeDescriptor.of(jaxbClass);
+ this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
+ @Override
+ protected Marshaller initialValue() {
+ try {
+ JAXBContext jaxbContext = getContext();
+ return jaxbContext.createMarshaller();
+ } catch (JAXBException e) {
+ throw new RuntimeException("Error when creating marshaller from JAXB Context.", e);
+ }
+ }
+ };
+ this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() {
+ @Override
+ protected Unmarshaller initialValue() {
+ try {
+ JAXBContext jaxbContext = getContext();
+ return jaxbContext.createUnmarshaller();
+ } catch (Exception e) {
+ throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e);
+ }
+ }
+ };
}
/**
@@ -68,9 +93,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
public void encode(T value, OutputStream outStream, Context context)
throws CoderException, IOException {
try {
- JAXBContext jaxbContext = getContext();
- // TODO: Consider caching in a ThreadLocal if this impacts performance
- Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
if (!context.isWholeStream) {
try {
long size = getEncodedElementByteSize(value, Context.OUTER);
@@ -83,7 +105,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
}
}
- jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
+ jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
} catch (JAXBException e) {
throw new CoderException(e);
}
@@ -92,17 +114,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
@Override
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
try {
- JAXBContext jaxbContext = getContext();
- // TODO: Consider caching in a ThreadLocal if this impacts performance
- Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
InputStream stream = inStream;
if (!context.isWholeStream) {
long limit = VarInt.decodeLong(inStream);
stream = ByteStreams.limit(inStream, limit);
}
@SuppressWarnings("unchecked")
- T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
+ T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
return obj;
} catch (JAXBException e) {
throw new CoderException(e);
[3/3] beam git commit: This closes #1795
Posted by tg...@apache.org.
This closes #1795
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/daed01a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/daed01a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/daed01a6
Branch: refs/heads/master
Commit: daed01a69af45cee3d3f250e1b60afde82d71e84
Parents: 9db5f74 cf0b990
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 13:00:42 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 13:00:45 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 19 +---------
.../org/apache/beam/sdk/coders/JAXBCoder.java | 36 +++++++++++++-----
.../util/EmptyOnDeserializationThreadLocal.java | 39 ++++++++++++++++++++
3 files changed, 67 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Refactor EmptyOnDeserializationThreadLocal to
util
Posted by tg...@apache.org.
Refactor EmptyOnDeserializationThreadLocal to util
This is a serialization-capable ThreadLocal used in AvroCoder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1ee05e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1ee05e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1ee05e8
Branch: refs/heads/master
Commit: e1ee05e80e54e61b511a71a98a66868b81745533
Parents: 9db5f74
Author: Kai Jiang <ji...@gmail.com>
Authored: Mon Jan 23 11:45:36 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 11:46:48 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 19 +---------
.../util/EmptyOnDeserializationThreadLocal.java | 39 ++++++++++++++++++++
2 files changed, 40 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e1ee05e8/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2c88c9a..9316224 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
@@ -59,6 +58,7 @@ import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.ClassUtils;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -176,23 +176,6 @@ public class AvroCoder<T> extends StandardCoder<T> {
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
/**
- * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
- * for Kryo to serialize an {@link AvroCoder} as a final field.
- */
- private static class EmptyOnDeserializationThreadLocal<T>
- extends ThreadLocal<T> implements Serializable {
- private void writeObject(java.io.ObjectOutputStream out) throws IOException {
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- }
-
- private void readObjectNoData() throws ObjectStreamException {
- }
- }
-
- /**
* A {@link Serializable} object that holds the {@link String} version of a {@link Schema}.
* This is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage
* of the {@link #readResolve} method.
http://git-wip-us.apache.org/repos/asf/beam/blob/e1ee05e8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
new file mode 100644
index 0000000..890728a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
+ * for Kryo to serialize a {@link Coder} as a final field.
+ */
+public class EmptyOnDeserializationThreadLocal<T> extends ThreadLocal<T> implements Serializable {
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ }
+
+ private void readObjectNoData() throws ObjectStreamException {
+ }
+}