You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/23 21:02:10 UTC

[1/3] beam git commit: Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder

Repository: beam
Updated Branches:
  refs/heads/master 9db5f746a -> daed01a69


Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder

This allows reuse of thread-unsafe marshallers and unmarshallers while
encoding elements, while the coder remains thread-safe.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf0b990b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf0b990b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf0b990b

Branch: refs/heads/master
Commit: cf0b990b0336f46b4d4775c93e86bd2310d622b5
Parents: e1ee05e
Author: Kai Jiang <ji...@gmail.com>
Authored: Thu Jan 19 05:06:13 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 11:46:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/JAXBCoder.java   | 36 +++++++++++++++-----
 1 file changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cf0b990b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 0a4f9cc..ea636fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -30,6 +30,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,6 +46,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private final Class<T> jaxbClass;
   private final TypeDescriptor<T> typeDescriptor;
   private transient volatile JAXBContext jaxbContext;
+  private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
+  private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
 
   public Class<T> getJAXBClass() {
     return jaxbClass;
@@ -53,6 +56,28 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private JAXBCoder(Class<T> jaxbClass) {
     this.jaxbClass = jaxbClass;
     this.typeDescriptor = TypeDescriptor.of(jaxbClass);
+    this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
+      @Override
+      protected Marshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createMarshaller();
+        } catch (JAXBException e) {
+          throw new RuntimeException("Error when creating marshaller from JAXB Context.", e);
+        }
+      }
+    };
+    this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() {
+      @Override
+      protected Unmarshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createUnmarshaller();
+        } catch (Exception e) {
+          throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e);
+        }
+      }
+    };
   }
 
   /**
@@ -68,9 +93,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
       if (!context.isWholeStream) {
         try {
           long size = getEncodedElementByteSize(value, Context.OUTER);
@@ -83,7 +105,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
         }
       }
 
-      jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
+      jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
     } catch (JAXBException e) {
       throw new CoderException(e);
     }
@@ -92,17 +114,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
       InputStream stream = inStream;
       if (!context.isWholeStream) {
         long limit = VarInt.decodeLong(inStream);
         stream = ByteStreams.limit(inStream, limit);
       }
       @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
+      T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
       return obj;
     } catch (JAXBException e) {
       throw new CoderException(e);


[3/3] beam git commit: This closes #1795

Posted by tg...@apache.org.
This closes #1795


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/daed01a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/daed01a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/daed01a6

Branch: refs/heads/master
Commit: daed01a69af45cee3d3f250e1b60afde82d71e84
Parents: 9db5f74 cf0b990
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 13:00:42 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 13:00:45 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 19 +---------
 .../org/apache/beam/sdk/coders/JAXBCoder.java   | 36 +++++++++++++-----
 .../util/EmptyOnDeserializationThreadLocal.java | 39 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Refactor EmptyOnDeserializationThreadLocal to util

Posted by tg...@apache.org.
Refactor EmptyOnDeserializationThreadLocal to util

This is a serialization-capable ThreadLocal used in AvroCoder.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1ee05e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1ee05e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1ee05e8

Branch: refs/heads/master
Commit: e1ee05e80e54e61b511a71a98a66868b81745533
Parents: 9db5f74
Author: Kai Jiang <ji...@gmail.com>
Authored: Mon Jan 23 11:45:36 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 11:46:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 19 +---------
 .../util/EmptyOnDeserializationThreadLocal.java | 39 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e1ee05e8/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2c88c9a..9316224 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectStreamException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Field;
@@ -59,6 +58,7 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.util.ClassUtils;
 import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -176,23 +176,6 @@ public class AvroCoder<T> extends StandardCoder<T> {
   private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
 
   /**
-   * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
-   * for Kryo to serialize an {@link AvroCoder} as a final field.
-   */
-  private static class EmptyOnDeserializationThreadLocal<T>
-      extends ThreadLocal<T> implements Serializable {
-    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-    }
-
-    private void readObjectNoData() throws ObjectStreamException {
-    }
-  }
-
-  /**
    * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}.
    * This is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage
    * of the {@link #readResolve} method.

http://git-wip-us.apache.org/repos/asf/beam/blob/e1ee05e8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
new file mode 100644
index 0000000..890728a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/EmptyOnDeserializationThreadLocal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * A {@link Serializable} {@link ThreadLocal} which discards any "stored" objects. This allows
+ * for Kryo to serialize a {@link Coder} as a final field.
+ */
+public class EmptyOnDeserializationThreadLocal<T> extends ThreadLocal<T> implements Serializable {
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+            throws IOException, ClassNotFoundException {
+    }
+
+    private void readObjectNoData() throws ObjectStreamException {
+    }
+}