You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 14:27:34 UTC
[1/2] beam git commit: [BEAM-2165] Update Dataflow to support
serializing/deserializing custom user types configured via Jackson modules
Repository: beam
Updated Branches:
refs/heads/master 02b72d664 -> 749b33f0b
[BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules
This also updates the runner harness and existing tests to use a properly constructed ObjectMapper for PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5729b58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5729b58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5729b58
Branch: refs/heads/master
Commit: e5729b58330a05e7be510710d0027c004704946b
Parents: 02b72d6
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 17:19:00 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:20:33 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 13 ++-
.../DataflowPipelineTranslatorTest.java | 83 ++++++++++++++++++++
.../options/DataflowProfilingOptionsTest.java | 4 +-
.../DataflowWorkerLoggingOptionsTest.java | 4 +-
.../apache/beam/sdk/options/ValueProviders.java | 4 +-
.../apache/beam/sdk/testing/TestPipeline.java | 4 +-
.../sdk/options/PipelineOptionsFactoryTest.java | 2 +-
.../sdk/options/ProxyInvocationHandlerTest.java | 4 +-
.../beam/sdk/options/ValueProviderTest.java | 19 +++--
.../beam/sdk/options/ValueProvidersTest.java | 19 ++---
.../beam/sdk/testing/TestPipelineTest.java | 6 +-
.../gcp/options/GoogleApiDebugOptionsTest.java | 8 +-
.../org/apache/beam/fn/harness/FnHarness.java | 5 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 2 -
.../runners/core/BeamFnDataWriteRunner.java | 2 -
.../control/ProcessBundleHandlerTest.java | 2 -
.../runners/core/BeamFnDataReadRunnerTest.java | 2 -
.../runners/core/BeamFnDataWriteRunnerTest.java | 2 -
18 files changed, 143 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 69b4ecd..e727433 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -32,6 +32,7 @@ import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.AutoscalingSettings;
import com.google.api.services.dataflow.model.DataflowPackage;
@@ -72,6 +73,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -90,6 +92,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -111,6 +114,14 @@ public class DataflowPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
+ /**
+ * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+ * for user specified configuration injection into the ObjectMapper. This supports user custom
+ * types on {@link PipelineOptions}.
+ */
+ private static final ObjectMapper MAPPER_WITH_MODULES = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
try {
return WindowingStrategies.toProto(windowingStrategy).toByteArray();
@@ -303,7 +314,7 @@ public class DataflowPipelineTranslator {
try {
environment.setSdkPipelineOptions(
- MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
+ MAPPER.readValue(MAPPER_WITH_MODULES.writeValueAsBytes(options), Map.class));
} catch (IOException e) {
throw new IllegalArgumentException(
"PipelineOptions specified failed to serialize to JSON.", e);
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 41f3c92..a6ad8c5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -37,11 +37,23 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -70,6 +82,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Count;
@@ -217,6 +230,76 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));
}
+ /** PipelineOptions used to test auto registration of Jackson modules. */
+ public interface JacksonIncompatibleOptions extends PipelineOptions {
+ JacksonIncompatible getJacksonIncompatible();
+ void setJacksonIncompatible(JacksonIncompatible value);
+ }
+
+ /** A Jackson {@link Module} to test auto-registration of modules. */
+ @AutoService(Module.class)
+ public static class RegisteredTestModule extends SimpleModule {
+ public RegisteredTestModule() {
+ super("RegisteredTestModule");
+ setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+ }
+ }
+
+ /** A class which Jackson does not know how to serialize/deserialize. */
+ public static class JacksonIncompatible {
+ private final String value;
+ public JacksonIncompatible(String value) {
+ this.value = value;
+ }
+ }
+
+ /** A Jackson mixin used to add annotations to other classes. */
+ @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+ @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+ public static final class JacksonIncompatibleMixin {}
+
+ /** A Jackson deserializer for {@link JacksonIncompatible}. */
+ public static class JacksonIncompatibleDeserializer extends
+ JsonDeserializer<JacksonIncompatible> {
+
+ @Override
+ public JacksonIncompatible deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+ }
+ }
+
+ /** A Jackson serializer for {@link JacksonIncompatible}. */
+ public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
+
+ @Override
+ public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+ jsonGenerator.writeString(jacksonIncompatible.value);
+ }
+ }
+
+ @Test
+ public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ options.as(JacksonIncompatibleOptions.class).setJacksonIncompatible(
+ new JacksonIncompatible("userCustomTypeTest"));
+
+ Pipeline p = Pipeline.create(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
+ assertThat(sdkPipelineOptions, hasKey("options"));
+ Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
+ assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest"));
+ }
+
@Test
public void testNetworkConfig() throws IOException {
final String testNetwork = "test-network";
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
index 4018cbb..68118a4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -33,7 +34,8 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class DataflowProfilingOptionsTest {
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@Test
public void testOptionsObject() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
index b463dcb..b1a5258 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -32,7 +33,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link DataflowWorkerLoggingOptions}. */
@RunWith(JUnit4.class)
public class DataflowWorkerLoggingOptionsTest {
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@Rule public ExpectedException expectedException = ExpectedException.none();
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index d034b81..e2355ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
* Utilities for working with the {@link ValueProvider} interface.
@@ -37,7 +38,8 @@ class ValueProviders {
*/
public static String updateSerializedOptions(
String serializedOptions, Map<String, String> runtimeValues) {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
ObjectNode root, options;
try {
root = mapper.readValue(serializedOptions, ObjectNode.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index ab8772b..4d0cc2b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
@@ -240,7 +241,8 @@ public class TestPipeline extends Pipeline implements TestRule {
static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private Optional<? extends PipelineRunEnforcement> enforcement = Optional.absent();
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 11bd7b9..76a5f18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -1689,7 +1689,7 @@ public class PipelineOptionsFactoryTest {
}
/** PipelineOptions used to test auto registration of Jackson modules. */
- public interface JacksonIncompatibleOptions extends PipelineOptions {
+ interface JacksonIncompatibleOptions extends PipelineOptions {
JacksonIncompatible getJacksonIncompatible();
void setJacksonIncompatible(JacksonIncompatible value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 323c24c..2c43f57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -77,7 +78,8 @@ public class ProxyInvocationHandlerTest {
}
};
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
/** A test interface with some primitives and objects. */
public interface Simple extends PipelineOptions {
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index 9369ae6..e596cc1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -41,6 +42,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link ValueProvider}. */
@RunWith(JUnit4.class)
public class ValueProviderTest {
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@Rule public ExpectedException expectedException = ExpectedException.none();
/** A test interface. */
@@ -118,8 +121,7 @@ public class ValueProviderTest {
@Test
public void testNoDefaultRuntimeProviderWithOverride() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- TestOptions runtime = mapper.readValue(
+ TestOptions runtime = MAPPER.readValue(
"{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class)
.as(TestOptions.class);
@@ -134,8 +136,7 @@ public class ValueProviderTest {
@Test
public void testDefaultRuntimeProviderWithOverride() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- TestOptions runtime = mapper.readValue(
+ TestOptions runtime = MAPPER.readValue(
"{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class)
.as(TestOptions.class);
@@ -196,12 +197,11 @@ public class ValueProviderTest {
public void testSerializeDeserializeNoArg() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
assertFalse(submitOptions.getFoo().isAccessible());
- ObjectMapper mapper = new ObjectMapper();
- String serializedOptions = mapper.writeValueAsString(submitOptions);
+ String serializedOptions = MAPPER.writeValueAsString(submitOptions);
String runnerString = ValueProviders.updateSerializedOptions(
serializedOptions, ImmutableMap.of("foo", "quux"));
- TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+ TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
.as(TestOptions.class);
ValueProvider<String> vp = runtime.getFoo();
@@ -215,12 +215,11 @@ public class ValueProviderTest {
TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class);
assertEquals("baz", submitOptions.getFoo().get());
assertTrue(submitOptions.getFoo().isAccessible());
- ObjectMapper mapper = new ObjectMapper();
- String serializedOptions = mapper.writeValueAsString(submitOptions);
+ String serializedOptions = MAPPER.writeValueAsString(submitOptions);
String runnerString = ValueProviders.updateSerializedOptions(
serializedOptions, ImmutableMap.of("foo", "quux"));
- TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+ TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
.as(TestOptions.class);
ValueProvider<String> vp = runtime.getFoo();
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
index 14f86bc..dd4d55b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -29,6 +30,9 @@ import org.junit.runners.JUnit4;
/** Tests for {@link ValueProviders}. */
@RunWith(JUnit4.class)
public class ValueProvidersTest {
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
/** A test interface. */
public interface TestOptions extends PipelineOptions {
String getString();
@@ -41,11 +45,10 @@ public class ValueProvidersTest {
@Test
public void testUpdateSerialize() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
- ObjectMapper mapper = new ObjectMapper();
- String serializedOptions = mapper.writeValueAsString(submitOptions);
+ String serializedOptions = MAPPER.writeValueAsString(submitOptions);
String updatedOptions = ValueProviders.updateSerializedOptions(
serializedOptions, ImmutableMap.of("string", "bar"));
- TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+ TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
.as(TestOptions.class);
assertEquals("bar", runtime.getString());
}
@@ -54,11 +57,10 @@ public class ValueProvidersTest {
public void testUpdateSerializeExistingValue() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.fromArgs(
"--string=baz", "--otherString=quux").as(TestOptions.class);
- ObjectMapper mapper = new ObjectMapper();
- String serializedOptions = mapper.writeValueAsString(submitOptions);
+ String serializedOptions = MAPPER.writeValueAsString(submitOptions);
String updatedOptions = ValueProviders.updateSerializedOptions(
serializedOptions, ImmutableMap.of("string", "bar"));
- TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+ TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
.as(TestOptions.class);
assertEquals("bar", runtime.getString());
assertEquals("quux", runtime.getOtherString());
@@ -67,11 +69,10 @@ public class ValueProvidersTest {
@Test
public void testUpdateSerializeEmptyUpdate() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
- ObjectMapper mapper = new ObjectMapper();
- String serializedOptions = mapper.writeValueAsString(submitOptions);
+ String serializedOptions = MAPPER.writeValueAsString(submitOptions);
String updatedOptions = ValueProviders.updateSerializedOptions(
serializedOptions, ImmutableMap.<String, String>of());
- TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class)
+ TestOptions runtime = MAPPER.readValue(updatedOptions, PipelineOptions.class)
.as(TestOptions.class);
assertNull(runtime.getString());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 04005c5..05abb59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
@@ -61,6 +62,8 @@ import org.junit.runners.Suite;
TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class
})
public class TestPipelineTest implements Serializable {
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
/** Tests related to the creation of a {@link TestPipeline}. */
@RunWith(JUnit4.class)
@@ -85,9 +88,8 @@ public class TestPipelineTest implements Serializable {
@Test
public void testCreationOfPipelineOptions() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
String stringOptions =
- mapper.writeValueAsString(
+ MAPPER.writeValueAsString(
new String[] {
"--runner=org.apache.beam.sdk.testing.CrashingRunner"
});
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
index 67d5880..68a29e6 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,6 +36,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link GoogleApiDebugOptions}. */
@RunWith(JUnit4.class)
public class GoogleApiDebugOptionsTest {
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
private static final String STORAGE_GET_TRACE =
"--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
private static final String STORAGE_GET_AND_LIST_TRACE =
@@ -139,9 +142,8 @@ public class GoogleApiDebugOptionsTest {
@Test
public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
String serializedValue = "{\"Api\":\"Token\"}";
- ObjectMapper objectMapper = new ObjectMapper();
assertEquals(serializedValue,
- objectMapper.writeValueAsString(
- objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(serializedValue, GoogleApiTracer.class)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d587986..24f826c 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -34,6 +34,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,9 @@ public class FnHarness {
System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
- PipelineOptions options = new ObjectMapper().readValue(
+ ObjectMapper objectMapper = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ PipelineOptions options = objectMapper.readValue(
System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor =
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 805d480..7c4a5e8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
@@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory;
*/
public class BeamFnDataReadRunner<OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 0ba09e3..3a11def 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
@@ -38,7 +37,6 @@ import org.apache.beam.sdk.values.KV;
* For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
*/
public class BeamFnDataWriteRunner<InputT> {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
private final BeamFnApi.Target outputTarget;
private final Coder<WindowedValue<InputT>> coder;
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 5987267..654f989 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -92,7 +91,6 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link ProcessBundleHandler}. */
@RunWith(JUnit4.class)
public class ProcessBundleHandlerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Coder<WindowedValue<String>> STRING_CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final String LONG_CODER_SPEC_ID = "998L";
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 0d036fe..04a3615 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -65,7 +64,6 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link BeamFnDataReadRunner}. */
@RunWith(JUnit4.class)
public class BeamFnDataReadRunnerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/e5729b58/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 50fee7a..9e50cd0 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import java.io.IOException;
import java.util.ArrayList;
@@ -53,7 +52,6 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link BeamFnDataWriteRunner}. */
@RunWith(JUnit4.class)
public class BeamFnDataWriteRunnerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
[2/2] beam git commit: [BEAM-2165] Update Dataflow to support
serializing/deserializing custom user types configured via Jackson modules
Posted by lc...@apache.org.
[BEAM-2165] Update Dataflow to support serializing/deserializing custom user types configured via Jackson modules
This closes #2881
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/749b33f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/749b33f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/749b33f0
Branch: refs/heads/master
Commit: 749b33f0b74a9bcd3daf03ea7f9b4579baec2651
Parents: 02b72d6 e5729b5
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 07:27:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 07:27:17 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 13 ++-
.../DataflowPipelineTranslatorTest.java | 83 ++++++++++++++++++++
.../options/DataflowProfilingOptionsTest.java | 4 +-
.../DataflowWorkerLoggingOptionsTest.java | 4 +-
.../apache/beam/sdk/options/ValueProviders.java | 4 +-
.../apache/beam/sdk/testing/TestPipeline.java | 4 +-
.../sdk/options/PipelineOptionsFactoryTest.java | 2 +-
.../sdk/options/ProxyInvocationHandlerTest.java | 4 +-
.../beam/sdk/options/ValueProviderTest.java | 19 +++--
.../beam/sdk/options/ValueProvidersTest.java | 19 ++---
.../beam/sdk/testing/TestPipelineTest.java | 6 +-
.../gcp/options/GoogleApiDebugOptionsTest.java | 8 +-
.../org/apache/beam/fn/harness/FnHarness.java | 5 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 2 -
.../runners/core/BeamFnDataWriteRunner.java | 2 -
.../control/ProcessBundleHandlerTest.java | 2 -
.../runners/core/BeamFnDataReadRunnerTest.java | 2 -
.../runners/core/BeamFnDataWriteRunnerTest.java | 2 -
18 files changed, 143 insertions(+), 42 deletions(-)
----------------------------------------------------------------------