You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 14:16:40 UTC

[1/2] beam git commit: [BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules

Repository: beam
Updated Branches:
  refs/heads/master f43b61af4 -> 02b72d664


[BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f1c8972
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f1c8972
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f1c8972

Branch: refs/heads/master
Commit: 1f1c897264ea7ab050c8644344f6e2648af9ae4a
Parents: f43b61a
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 17:17:11 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:16:04 2017 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  4 +
 .../utils/SerializablePipelineOptions.java      | 15 ++-
 .../translation/utils/PipelineOptionsTest.java  | 98 ++++++++++++++++----
 3 files changed, 99 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 36252e8..aa4bddf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -63,6 +63,10 @@
       <version>${apex.malhar.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 1a47ed5..14476b5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.apex.translation.utils;
 
+import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.Externalizable;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * A wrapper to enable serialization of {@link PipelineOptions}.
@@ -51,13 +53,13 @@ public class SerializablePipelineOptions implements Externalizable {
 
   @Override
   public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
+    out.writeUTF(createMapper().writeValueAsString(pipelineOptions));
   }
 
   @Override
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
     String s = in.readUTF();
-    this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+    this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class)
         .as(ApexPipelineOptions.class);
 
     if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
@@ -66,4 +68,13 @@ public class SerializablePipelineOptions implements Externalizable {
     }
   }
 
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+   * for user specified configuration injection into the ObjectMapper. This supports user custom
+   * types on {@link PipelineOptions}.
+   */
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
index d5eb9a9..118ff99 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
@@ -23,15 +23,25 @@ import static org.junit.Assert.assertNotNull;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-
+import java.io.IOException;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -49,36 +59,92 @@ public class PipelineOptionsTest {
     void setTestOption(String value);
   }
 
-  private static class MyOptionsWrapper {
-    private MyOptionsWrapper() {
+  private static class OptionsWrapper {
+    private OptionsWrapper() {
       this(null); // required for Kryo
     }
-    private MyOptionsWrapper(ApexPipelineOptions options) {
+    private OptionsWrapper(ApexPipelineOptions options) {
       this.options = new SerializablePipelineOptions(options);
     }
     @Bind(JavaSerializer.class)
     private final SerializablePipelineOptions options;
   }
 
-  private static MyOptions options;
-
-  private static final String[] args = new String[]{"--testOption=nothing"};
+  @Test
+  public void testSerialization() {
+    OptionsWrapper wrapper = new OptionsWrapper(
+        PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class));
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    FSStorageAgent.store(bos, wrapper);
 
-  @BeforeClass
-  public static void beforeTest() {
-    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
+    assertNotNull(wrapperCopy.options);
+    assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
   }
 
   @Test
-  public void testSerialization() {
-    MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options);
+  public void testSerializationWithUserCustomType() {
+    OptionsWrapper wrapper = new OptionsWrapper(
+        PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
+            .as(JacksonIncompatibleOptions.class));
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     FSStorageAgent.store(bos, wrapper);
 
     ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis);
+    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
     assertNotNull(wrapperCopy.options);
-    assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
+    assertEquals("testValue",
+        wrapperCopy.options.get().as(JacksonIncompatibleOptions.class)
+            .getJacksonIncompatible().value);
+  }
+
+  /** PipelineOptions used to test auto registration of Jackson modules. */
+  public interface JacksonIncompatibleOptions extends ApexPipelineOptions {
+    JacksonIncompatible getJacksonIncompatible();
+    void setJacksonIncompatible(JacksonIncompatible value);
   }
 
+  /** A Jackson {@link Module} to test auto-registration of modules. */
+  @AutoService(Module.class)
+  public static class RegisteredTestModule extends SimpleModule {
+    public RegisteredTestModule() {
+      super("RegisteredTestModule");
+      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+    }
+  }
+
+  /** A class which Jackson does not know how to serialize/deserialize. */
+  public static class JacksonIncompatible {
+    private final String value;
+    public JacksonIncompatible(String value) {
+      this.value = value;
+    }
+  }
+
+  /** A Jackson mixin used to add annotations to other classes. */
+  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+  public static final class JacksonIncompatibleMixin {}
+
+  /** A Jackson deserializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleDeserializer extends
+      JsonDeserializer<JacksonIncompatible> {
+
+    @Override
+    public JacksonIncompatible deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+    }
+  }
+
+  /** A Jackson serializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
+
+    @Override
+    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+      jsonGenerator.writeString(jacksonIncompatible.value);
+    }
+  }
 }


[2/2] beam git commit: [BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules

Posted by lc...@apache.org.
[BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules

This closes #2880


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02b72d66
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02b72d66
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02b72d66

Branch: refs/heads/master
Commit: 02b72d6644c07b72a4c977a6cb16d59ec5a0ed8c
Parents: f43b61a 1f1c897
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 07:16:29 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:16:29 2017 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  4 +
 .../utils/SerializablePipelineOptions.java      | 15 ++-
 .../translation/utils/PipelineOptionsTest.java  | 98 ++++++++++++++++----
 3 files changed, 99 insertions(+), 18 deletions(-)
----------------------------------------------------------------------