You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 14:27:34 UTC

[1/2] beam git commit: [BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules

Repository: beam
Updated Branches:
  refs/heads/master 02b72d664 -> 749b33f0b


[BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules

This also updates the runner harness and existing tests to use a properly constructed ObjectMapper for PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5729b58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5729b58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5729b58

Branch: refs/heads/master
Commit: e5729b58330a05e7be510710d0027c004704946b
Parents: 02b72d6
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 17:19:00 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:20:33 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 13 ++-
 .../DataflowPipelineTranslatorTest.java         | 83 ++++++++++++++++++++
 .../options/DataflowProfilingOptionsTest.java   |  4 +-
 .../DataflowWorkerLoggingOptionsTest.java       |  4 +-
 .../apache/beam/sdk/options/ValueProviders.java |  4 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  4 +-
 .../sdk/options/PipelineOptionsFactoryTest.java |  2 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  4 +-
 .../beam/sdk/options/ValueProviderTest.java     | 19 +++--
 .../beam/sdk/options/ValueProvidersTest.java    | 19 ++---
 .../beam/sdk/testing/TestPipelineTest.java      |  6 +-
 .../gcp/options/GoogleApiDebugOptionsTest.java  |  8 +-
 .../org/apache/beam/fn/harness/FnHarness.java   |  5 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  2 -
 .../runners/core/BeamFnDataWriteRunner.java     |  2 -
 .../control/ProcessBundleHandlerTest.java       |  2 -
 .../runners/core/BeamFnDataReadRunnerTest.java  |  2 -
 .../runners/core/BeamFnDataWriteRunnerTest.java |  2 -
 18 files changed, 143 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 69b4ecd..e727433 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -32,6 +32,7 @@ import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.dataflow.model.AutoscalingSettings;
 import com.google.api.services.dataflow.model.DataflowPackage;
@@ -72,6 +73,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -90,6 +92,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -111,6 +114,14 @@ public class DataflowPipelineTranslator {
   private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+   * for user specified configuration injection into the ObjectMapper. This supports user custom
+   * types on {@link PipelineOptions}.
+   */
+  private static final ObjectMapper MAPPER_WITH_MODULES = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
   private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
     try {
       return WindowingStrategies.toProto(windowingStrategy).toByteArray();
@@ -303,7 +314,7 @@ public class DataflowPipelineTranslator {
 
       try {
         environment.setSdkPipelineOptions(
-            MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
+            MAPPER.readValue(MAPPER_WITH_MODULES.writeValueAsBytes(options), Map.class));
       } catch (IOException e) {
         throw new IllegalArgumentException(
             "PipelineOptions specified failed to serialize to JSON.", e);

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 41f3c92..a6ad8c5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -37,11 +37,23 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -70,6 +82,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Count;
@@ -217,6 +230,76 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));
   }
 
+  /** PipelineOptions used to test auto registration of Jackson modules. */
+  public interface JacksonIncompatibleOptions extends PipelineOptions {
+    JacksonIncompatible getJacksonIncompatible();
+    void setJacksonIncompatible(JacksonIncompatible value);
+  }
+
+  /** A Jackson {@link Module} to test auto-registration of modules. */
+  @AutoService(Module.class)
+  public static class RegisteredTestModule extends SimpleModule {
+    public RegisteredTestModule() {
+      super("RegisteredTestModule");
+      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+    }
+  }
+
+  /** A class which Jackson does not know how to serialize/deserialize. */
+  public static class JacksonIncompatible {
+    private final String value;
+    public JacksonIncompatible(String value) {
+      this.value = value;
+    }
+  }
+
+  /** A Jackson mixin used to add annotations to other classes. */
+  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+  public static final class JacksonIncompatibleMixin {}
+
+  /** A Jackson deserializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleDeserializer extends
+      JsonDeserializer<JacksonIncompatible> {
+
+    @Override
+    public JacksonIncompatible deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+    }
+  }
+
+  /** A Jackson serializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
+
+    @Override
+    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+      jsonGenerator.writeString(jacksonIncompatible.value);
+    }
+  }
+
+  @Test
+  public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    options.as(JacksonIncompatibleOptions.class).setJacksonIncompatible(
+        new JacksonIncompatible("userCustomTypeTest"));
+
+    Pipeline p = Pipeline.create(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
+    assertThat(sdkPipelineOptions, hasKey("options"));
+    Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
+    assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest"));
+  }
+
   @Test
   public void testNetworkConfig() throws IOException {
     final String testNetwork = "test-network";

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
index 4018cbb..68118a4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -33,7 +34,8 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class DataflowProfilingOptionsTest {
 
-  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   @Test
   public void testOptionsObject() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
index b463dcb..b1a5258 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -32,7 +33,8 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link DataflowWorkerLoggingOptions}. */
 @RunWith(JUnit4.class)
 public class DataflowWorkerLoggingOptionsTest {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
   @Rule public ExpectedException expectedException = ExpectedException.none();
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index d034b81..e2355ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
 import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Utilities for working with the {@link ValueProvider} interface.
@@ -37,7 +38,8 @@ class ValueProviders {
    */
   public static String updateSerializedOptions(
       String serializedOptions, Map<String, String> runtimeValues) {
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
     ObjectNode root, options;
     try {
       root = mapper.readValue(serializedOptions, ObjectNode.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index ab8772b..4d0cc2b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
@@ -240,7 +241,8 @@ public class TestPipeline extends Pipeline implements TestRule {
 
   static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
   static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
-  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   private Optional<? extends PipelineRunEnforcement> enforcement = Optional.absent();

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 11bd7b9..76a5f18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -1689,7 +1689,7 @@ public class PipelineOptionsFactoryTest {
   }
 
   /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends PipelineOptions {
+  interface JacksonIncompatibleOptions extends PipelineOptions {
     JacksonIncompatible getJacksonIncompatible();
     void setJacksonIncompatible(JacksonIncompatible value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 323c24c..2c43f57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -77,7 +78,8 @@ public class ProxyInvocationHandlerTest {
     }
   };
 
-  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   /** A test interface with some primitives and objects. */
   public interface Simple extends PipelineOptions {

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index 9369ae6..e596cc1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -41,6 +42,8 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link ValueProvider}. */
 @RunWith(JUnit4.class)
 public class ValueProviderTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
   @Rule public ExpectedException expectedException = ExpectedException.none();
 
   /** A test interface. */
@@ -118,8 +121,7 @@ public class ValueProviderTest {
 
   @Test
   public void testNoDefaultRuntimeProviderWithOverride() throws Exception {
-    ObjectMapper mapper = new ObjectMapper();
-    TestOptions runtime = mapper.readValue(
+    TestOptions runtime = MAPPER.readValue(
       "{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class)
       .as(TestOptions.class);
 
@@ -134,8 +136,7 @@ public class ValueProviderTest {
 
   @Test
   public void testDefaultRuntimeProviderWithOverride() throws Exception {
-    ObjectMapper mapper = new ObjectMapper();
-    TestOptions runtime = mapper.readValue(
+    TestOptions runtime = MAPPER.readValue(
       "{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class)
       .as(TestOptions.class);
 
@@ -196,12 +197,11 @@ public class ValueProviderTest {
   public void testSerializeDeserializeNoArg() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
     assertFalse(submitOptions.getFoo().isAccessible());
-    ObjectMapper mapper = new ObjectMapper();
-    String serializedOptions = mapper.writeValueAsString(submitOptions);
+    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
 
     String runnerString = ValueProviders.updateSerializedOptions(
       serializedOptions, ImmutableMap.of("foo", "quux"));
-    TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+    TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
       .as(TestOptions.class);
 
     ValueProvider<String> vp = runtime.getFoo();
@@ -215,12 +215,11 @@ public class ValueProviderTest {
     TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class);
     assertEquals("baz", submitOptions.getFoo().get());
     assertTrue(submitOptions.getFoo().isAccessible());
-    ObjectMapper mapper = new ObjectMapper();
-    String serializedOptions = mapper.writeValueAsString(submitOptions);
+    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
 
     String runnerString = ValueProviders.updateSerializedOptions(
       serializedOptions, ImmutableMap.of("foo", "quux"));
-    TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+    TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
       .as(TestOptions.class);
 
     ValueProvider<String> vp = runtime.getFoo();

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
index 14f86bc..dd4d55b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -29,6 +30,9 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link ValueProviders}. */
 @RunWith(JUnit4.class)
 public class ValueProvidersTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
   /** A test interface. */
   public interface TestOptions extends PipelineOptions {
     String getString();
@@ -41,11 +45,10 @@ public class ValueProvidersTest {
   @Test
   public void testUpdateSerialize() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
-    ObjectMapper mapper = new ObjectMapper();
-    String serializedOptions = mapper.writeValueAsString(submitOptions);
+    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
     String updatedOptions = ValueProviders.updateSerializedOptions(
       serializedOptions, ImmutableMap.of("string", "bar"));
-    TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+    TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
       .as(TestOptions.class);
     assertEquals("bar", runtime.getString());
   }
@@ -54,11 +57,10 @@ public class ValueProvidersTest {
   public void testUpdateSerializeExistingValue() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.fromArgs(
         "--string=baz", "--otherString=quux").as(TestOptions.class);
-    ObjectMapper mapper = new ObjectMapper();
-    String serializedOptions = mapper.writeValueAsString(submitOptions);
+    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
     String updatedOptions = ValueProviders.updateSerializedOptions(
       serializedOptions, ImmutableMap.of("string", "bar"));
-    TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+    TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
       .as(TestOptions.class);
     assertEquals("bar", runtime.getString());
     assertEquals("quux", runtime.getOtherString());
@@ -67,11 +69,10 @@ public class ValueProvidersTest {
   @Test
   public void testUpdateSerializeEmptyUpdate() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
-    ObjectMapper mapper = new ObjectMapper();
-    String serializedOptions = mapper.writeValueAsString(submitOptions);
+    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
     String updatedOptions = ValueProviders.updateSerializedOptions(
       serializedOptions, ImmutableMap.<String, String>of());
-    TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+    TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
       .as(TestOptions.class);
     assertNull(runtime.getString());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 04005c5..05abb59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -61,6 +62,8 @@ import org.junit.runners.Suite;
   TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class
 })
 public class TestPipelineTest implements Serializable {
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   /** Tests related to the creation of a {@link TestPipeline}. */
   @RunWith(JUnit4.class)
@@ -85,9 +88,8 @@ public class TestPipelineTest implements Serializable {
 
     @Test
     public void testCreationOfPipelineOptions() throws Exception {
-      ObjectMapper mapper = new ObjectMapper();
       String stringOptions =
-          mapper.writeValueAsString(
+          MAPPER.writeValueAsString(
               new String[] {
                 "--runner=org.apache.beam.sdk.testing.CrashingRunner"
               });

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
index 67d5880..68a29e6 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -35,6 +36,8 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link GoogleApiDebugOptions}. */
 @RunWith(JUnit4.class)
 public class GoogleApiDebugOptionsTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
   private static final String STORAGE_GET_TRACE =
       "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
   private static final String STORAGE_GET_AND_LIST_TRACE =
@@ -139,9 +142,8 @@ public class GoogleApiDebugOptionsTest {
   @Test
   public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
     String serializedValue = "{\"Api\":\"Token\"}";
-    ObjectMapper objectMapper = new ObjectMapper();
     assertEquals(serializedValue,
-        objectMapper.writeValueAsString(
-            objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(serializedValue, GoogleApiTracer.class)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d587986..24f826c 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -34,6 +34,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,9 @@ public class FnHarness {
     System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
     System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
 
-    PipelineOptions options = new ObjectMapper().readValue(
+    ObjectMapper objectMapper = new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+    PipelineOptions options = objectMapper.readValue(
         System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
 
     BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor =

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 805d480..7c4a5e8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.core;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
@@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory;
  */
 public class BeamFnDataReadRunner<OutputT> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
   private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 0ba09e3..3a11def 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.core;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
@@ -38,7 +37,6 @@ import org.apache.beam.sdk.values.KV;
  * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
  */
 public class BeamFnDataWriteRunner<InputT> {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
   private final BeamFnApi.Target outputTarget;
   private final Coder<WindowedValue<InputT>> coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 5987267..654f989 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -92,7 +91,6 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link ProcessBundleHandler}. */
 @RunWith(JUnit4.class)
 public class ProcessBundleHandlerTest {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final Coder<WindowedValue<String>> STRING_CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final String LONG_CODER_SPEC_ID = "998L";

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 0d036fe..04a3615 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -65,7 +64,6 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link BeamFnDataReadRunner}. */
 @RunWith(JUnit4.class)
 public class BeamFnDataReadRunnerTest {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 50fee7a..9e50cd0 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Any;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -53,7 +52,6 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link BeamFnDataWriteRunner}. */
 @RunWith(JUnit4.class)
 public class BeamFnDataWriteRunnerTest {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()


[2/2] beam git commit: [BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules

Posted by lc...@apache.org.
[BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules

This closes #2881


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/749b33f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/749b33f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/749b33f0

Branch: refs/heads/master
Commit: 749b33f0b74a9bcd3daf03ea7f9b4579baec2651
Parents: 02b72d6 e5729b5
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 07:27:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:27:17 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 13 ++-
 .../DataflowPipelineTranslatorTest.java         | 83 ++++++++++++++++++++
 .../options/DataflowProfilingOptionsTest.java   |  4 +-
 .../DataflowWorkerLoggingOptionsTest.java       |  4 +-
 .../apache/beam/sdk/options/ValueProviders.java |  4 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  4 +-
 .../sdk/options/PipelineOptionsFactoryTest.java |  2 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  4 +-
 .../beam/sdk/options/ValueProviderTest.java     | 19 +++--
 .../beam/sdk/options/ValueProvidersTest.java    | 19 ++---
 .../beam/sdk/testing/TestPipelineTest.java      |  6 +-
 .../gcp/options/GoogleApiDebugOptionsTest.java  |  8 +-
 .../org/apache/beam/fn/harness/FnHarness.java   |  5 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  2 -
 .../runners/core/BeamFnDataWriteRunner.java     |  2 -
 .../control/ProcessBundleHandlerTest.java       |  2 -
 .../runners/core/BeamFnDataReadRunnerTest.java  |  2 -
 .../runners/core/BeamFnDataWriteRunnerTest.java |  2 -
 18 files changed, 143 insertions(+), 42 deletions(-)
----------------------------------------------------------------------