You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 14:16:40 UTC
[1/2] beam git commit: [BEAM-2165] Update Apex to support
serializing/deserializing custom user types configured via Jackson modules
Repository: beam
Updated Branches:
refs/heads/master f43b61af4 -> 02b72d664
[BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f1c8972
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f1c8972
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f1c8972
Branch: refs/heads/master
Commit: 1f1c897264ea7ab050c8644344f6e2648af9ae4a
Parents: f43b61a
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 17:17:11 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:16:04 2017 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 4 +
.../utils/SerializablePipelineOptions.java | 15 ++-
.../translation/utils/PipelineOptionsTest.java | 98 ++++++++++++++++----
3 files changed, 99 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 36252e8..aa4bddf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -63,6 +63,10 @@
<version>${apex.malhar.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 1a47ed5..14476b5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.apex.translation.utils;
+import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Externalizable;
import java.io.IOException;
@@ -27,6 +28,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
* A wrapper to enable serialization of {@link PipelineOptions}.
@@ -51,13 +53,13 @@ public class SerializablePipelineOptions implements Externalizable {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
+ out.writeUTF(createMapper().writeValueAsString(pipelineOptions));
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
String s = in.readUTF();
- this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+ this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class)
.as(ApexPipelineOptions.class);
if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
@@ -66,4 +68,13 @@ public class SerializablePipelineOptions implements Externalizable {
}
}
+ /**
+ * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+ * for user specified configuration injection into the ObjectMapper. This supports user custom
+ * types on {@link PipelineOptions}.
+ */
+ private static ObjectMapper createMapper() {
+ return new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f1c8972/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
index d5eb9a9..118ff99 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
@@ -23,15 +23,25 @@ import static org.junit.Assert.assertNotNull;
import com.datatorrent.common.util.FSStorageAgent;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-
+import java.io.IOException;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -49,36 +59,92 @@ public class PipelineOptionsTest {
void setTestOption(String value);
}
- private static class MyOptionsWrapper {
- private MyOptionsWrapper() {
+ private static class OptionsWrapper {
+ private OptionsWrapper() {
this(null); // required for Kryo
}
- private MyOptionsWrapper(ApexPipelineOptions options) {
+ private OptionsWrapper(ApexPipelineOptions options) {
this.options = new SerializablePipelineOptions(options);
}
@Bind(JavaSerializer.class)
private final SerializablePipelineOptions options;
}
- private static MyOptions options;
-
- private static final String[] args = new String[]{"--testOption=nothing"};
+ @Test
+ public void testSerialization() {
+ OptionsWrapper wrapper = new OptionsWrapper(
+ PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FSStorageAgent.store(bos, wrapper);
- @BeforeClass
- public static void beforeTest() {
- options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
+ assertNotNull(wrapperCopy.options);
+ assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
}
@Test
- public void testSerialization() {
- MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options);
+ public void testSerializationWithUserCustomType() {
+ OptionsWrapper wrapper = new OptionsWrapper(
+ PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
+ .as(JacksonIncompatibleOptions.class));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
FSStorageAgent.store(bos, wrapper);
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis);
+ OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
assertNotNull(wrapperCopy.options);
- assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
+ assertEquals("testValue",
+ wrapperCopy.options.get().as(JacksonIncompatibleOptions.class)
+ .getJacksonIncompatible().value);
+ }
+
+ /** PipelineOptions used to test auto registration of Jackson modules. */
+ public interface JacksonIncompatibleOptions extends ApexPipelineOptions {
+ JacksonIncompatible getJacksonIncompatible();
+ void setJacksonIncompatible(JacksonIncompatible value);
}
+ /** A Jackson {@link Module} to test auto-registration of modules. */
+ @AutoService(Module.class)
+ public static class RegisteredTestModule extends SimpleModule {
+ public RegisteredTestModule() {
+ super("RegisteredTestModule");
+ setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+ }
+ }
+
+ /** A class which Jackson does not know how to serialize/deserialize. */
+ public static class JacksonIncompatible {
+ private final String value;
+ public JacksonIncompatible(String value) {
+ this.value = value;
+ }
+ }
+
+ /** A Jackson mixin used to add annotations to other classes. */
+ @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+ @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+ public static final class JacksonIncompatibleMixin {}
+
+ /** A Jackson deserializer for {@link JacksonIncompatible}. */
+ public static class JacksonIncompatibleDeserializer extends
+ JsonDeserializer<JacksonIncompatible> {
+
+ @Override
+ public JacksonIncompatible deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+ }
+ }
+
+ /** A Jackson serializer for {@link JacksonIncompatible}. */
+ public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
+
+ @Override
+ public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+ jsonGenerator.writeString(jacksonIncompatible.value);
+ }
+ }
}
[2/2] beam git commit: [BEAM-2165] Update Apex to support
serializing/deserializing custom user types configured via Jackson modules
Posted by lc...@apache.org.
[BEAM-2165] Update Apex to support serializing/deserializing custom user types configured via Jackson modules
This closes #2880
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02b72d66
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02b72d66
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02b72d66
Branch: refs/heads/master
Commit: 02b72d6644c07b72a4c977a6cb16d59ec5a0ed8c
Parents: f43b61a 1f1c897
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 07:16:29 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:16:29 2017 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 4 +
.../utils/SerializablePipelineOptions.java | 15 ++-
.../translation/utils/PipelineOptionsTest.java | 98 ++++++++++++++++----
3 files changed, 99 insertions(+), 18 deletions(-)
----------------------------------------------------------------------