You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:32:50 UTC
[1/2] beam git commit: [BEAM-2644] Introduces
TestPipeline.newProvider()
Repository: beam
Updated Branches:
refs/heads/master 5c2cab017 -> f6c840533
[BEAM-2644] Introduces TestPipeline.newProvider()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b19b71
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b19b71
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b19b71
Branch: refs/heads/master
Commit: f1b19b71d2905079a4640d9fb89e02985ca6e873
Parents: 5c2cab0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Aug 23 19:13:46 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 12:14:34 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/options/ValueProvider.java | 10 ++--
.../apache/beam/sdk/options/ValueProviders.java | 15 +++---
.../apache/beam/sdk/testing/TestPipeline.java | 49 +++++++++++++++++++-
.../sdk/transforms/display/DisplayData.java | 5 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++++--
.../sdk/options/ProxyInvocationHandlerTest.java | 4 +-
.../beam/sdk/options/ValueProviderTest.java | 23 +++++----
.../beam/sdk/testing/TestPipelineTest.java | 37 ++++++++++++++-
.../apache/beam/sdk/transforms/CreateTest.java | 22 ++-------
9 files changed, 127 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 15413e8..3e6a24b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,6 +41,7 @@ import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
@@ -54,18 +55,21 @@ import org.apache.beam.sdk.values.PCollection;
* <p>A common task is to create a {@link PCollection} containing the value of this
* {@link ValueProvider} regardless of whether it's accessible at construction time or not.
* For that, use {@link Create#ofProvider}.
+ *
+ * <p>For unit-testing a transform against a {@link ValueProvider} that only becomes available
+ * at runtime, use {@link TestPipeline#newProvider}.
*/
@JsonSerialize(using = ValueProvider.Serializer.class)
@JsonDeserialize(using = ValueProvider.Deserializer.class)
public interface ValueProvider<T> extends Serializable {
/**
- * Return the value wrapped by this {@link ValueProvider}.
+ * Returns the runtime value wrapped by this {@link ValueProvider} in case it is {@link
+ * #isAccessible}, otherwise fails.
*/
T get();
/**
- * Whether the contents of this {@link ValueProvider} is available to
- * routines that run at graph construction time.
+ * Whether the contents of this {@link ValueProvider} is currently available via {@link #get}.
*/
boolean isAccessible();
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 2fffffa..9345462 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -22,17 +22,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.Map;
+import org.apache.beam.sdk.testing.TestPipeline;
-/**
- * Utilities for working with the {@link ValueProvider} interface.
- */
+/** Utilities for working with the {@link ValueProvider} interface. */
public class ValueProviders {
private ValueProviders() {}
/**
- * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates
- * the values according to the provided values in {@code runtimeValues}.
+ * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates the
+ * values according to the provided values in {@code runtimeValues}.
+ *
+ * @deprecated Use {@link TestPipeline#newProvider} for testing {@link ValueProvider} code.
*/
+ @Deprecated
public static String updateSerializedOptions(
String serializedOptions, Map<String, String> runtimeValues) {
ObjectNode root, options;
@@ -41,8 +43,7 @@ public class ValueProviders {
options = (ObjectNode) root.get("options");
checkNotNull(options, "Unable to locate 'options' in %s", serializedOptions);
} catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to parse %s", serializedOptions), e);
+ throw new RuntimeException(String.format("Unable to parse %s", serializedOptions), e);
}
for (Map.Entry<String, String> entry : runtimeValues.entrySet()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index b67b14f..be2f193 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -31,15 +31,19 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
@@ -49,7 +53,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
@@ -341,7 +348,12 @@ public class TestPipeline extends Pipeline implements TestRule {
final PipelineResult pipelineResult;
try {
enforcement.get().beforePipelineExecution();
- pipelineResult = super.run(options);
+ PipelineOptions updatedOptions =
+ MAPPER.convertValue(MAPPER.valueToTree(options), PipelineOptions.class);
+ updatedOptions
+ .as(TestValueProviderOptions.class)
+ .setProviderRuntimeValues(StaticValueProvider.of(providerRuntimeValues));
+ pipelineResult = super.run(updatedOptions);
verifyPAssertsSucceeded(this, pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
@@ -358,6 +370,41 @@ public class TestPipeline extends Pipeline implements TestRule {
return pipelineResult;
}
+ /** Implementation detail of {@link #newProvider}, do not use. */
+ @Internal
+ public interface TestValueProviderOptions extends PipelineOptions {
+ ValueProvider<Map<String, Object>> getProviderRuntimeValues();
+ void setProviderRuntimeValues(ValueProvider<Map<String, Object>> runtimeValues);
+ }
+
+ /**
+ * Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be
+ * accessible while the pipeline runs.
+ */
+ public <T> ValueProvider<T> newProvider(T runtimeValue) {
+ String uuid = UUID.randomUUID().toString();
+ providerRuntimeValues.put(uuid, runtimeValue);
+ return ValueProvider.NestedValueProvider.of(
+ options.as(TestValueProviderOptions.class).getProviderRuntimeValues(),
+ new GetFromRuntimeValues<T>(uuid));
+ }
+
+ private final Map<String, Object> providerRuntimeValues = Maps.newHashMap();
+
+ private static class GetFromRuntimeValues<T>
+ implements SerializableFunction<Map<String, Object>, T> {
+ private final String key;
+
+ private GetFromRuntimeValues(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public T apply(Map<String, Object> input) {
+ return (T) input.get(key);
+ }
+ }
+
/**
* Enables the abandoned node detection. Abandoned nodes are <code>PTransforms</code>, <code>
* PAsserts</code> included, that were not executed by the pipeline runner. Abandoned nodes are
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 10ef428..917c070 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -796,8 +796,9 @@ public class DisplayData implements Serializable {
// Don't re-wrap exceptions recursively.
throw e;
} catch (Throwable e) {
- String msg = String.format("Error while populating display data for component: %s",
- namespace.getName());
+ String msg = String.format(
+ "Error while populating display data for component '%s': %s",
+ namespace.getName(), e.getMessage());
throw new PopulateDisplayDataException(msg, e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index f49443d..8870dd8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -65,7 +65,6 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -226,14 +225,19 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath());
-
writePipeline
.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding());
+ .apply(
+ AvroIO.write(GenericClass.class)
+ .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+ .withoutSharding());
writePipeline.run().waitUntilFinish();
- PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider)))
+ PAssert.that(
+ readPipeline.apply(
+ "Read",
+ AvroIO.read(GenericClass.class)
+ .from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
.containsInAnyOrder(values);
readPipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index fb0a0d7..fe8a0f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -24,7 +24,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -62,7 +61,6 @@ import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;
@@ -797,7 +795,7 @@ public class ProxyInvocationHandlerTest {
expectedException.expectMessage(
ProxyInvocationHandler.PipelineOptionsDisplayData.class.getName());
- expectedException.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+ expectedException.expectMessage("oh noes!!");
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index 7bbbf7e..51a92e3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -194,16 +194,16 @@ public class ValueProviderTest {
StaticValueProvider<String> provider = options.getBar();
}
+
@Test
public void testSerializeDeserializeNoArg() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
assertFalse(submitOptions.getFoo().isAccessible());
- String serializedOptions = MAPPER.writeValueAsString(submitOptions);
- String runnerString = ValueProviders.updateSerializedOptions(
- serializedOptions, ImmutableMap.of("foo", "quux"));
- TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
- .as(TestOptions.class);
+ ObjectNode root = MAPPER.valueToTree(submitOptions);
+ ((ObjectNode) root.get("options")).put("foo", "quux");
+ TestOptions runtime =
+ MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class);
ValueProvider<String> vp = runtime.getFoo();
assertTrue(vp.isAccessible());
@@ -214,14 +214,13 @@ public class ValueProviderTest {
@Test
public void testSerializeDeserializeWithArg() throws Exception {
TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class);
- assertEquals("baz", submitOptions.getFoo().get());
assertTrue(submitOptions.getFoo().isAccessible());
- String serializedOptions = MAPPER.writeValueAsString(submitOptions);
+ assertEquals("baz", submitOptions.getFoo().get());
- String runnerString = ValueProviders.updateSerializedOptions(
- serializedOptions, ImmutableMap.of("foo", "quux"));
- TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
- .as(TestOptions.class);
+ ObjectNode root = MAPPER.valueToTree(submitOptions);
+ ((ObjectNode) root.get("options")).put("foo", "quux");
+ TestOptions runtime =
+ MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class);
ValueProvider<String> vp = runtime.getFoo();
assertTrue(vp.isAccessible());
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 664f2f4..ec681ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.testing;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -37,8 +38,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
@@ -59,7 +62,8 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
TestPipelineTest.TestPipelineCreationTest.class,
TestPipelineTest.TestPipelineEnforcementsTest.WithRealPipelineRunner.class,
- TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class
+ TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class,
+ TestPipelineTest.NewProviderTest.class
})
public class TestPipelineTest implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
@@ -337,4 +341,35 @@ public class TestPipelineTest implements Serializable {
}
}
}
+
+ /** Tests for {@link TestPipeline#newProvider}. */
+ @RunWith(JUnit4.class)
+ public static class NewProviderTest implements Serializable {
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testNewProvider() {
+ ValueProvider<String> foo = pipeline.newProvider("foo");
+ ValueProvider<String> foobar =
+ ValueProvider.NestedValueProvider.of(
+ foo,
+ new SerializableFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input + "bar";
+ }
+ });
+
+ assertFalse(foo.isAccessible());
+ assertFalse(foobar.isAccessible());
+
+ PAssert.that(pipeline.apply("create foo", Create.ofProvider(foo, StringUtf8Coder.of())))
+ .containsInAnyOrder("foo");
+ PAssert.that(pipeline.apply("create foobar", Create.ofProvider(foobar, StringUtf8Coder.of())))
+ .containsInAnyOrder("foobar");
+
+ pipeline.run();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 81ad947..1c7e1af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,9 +25,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
@@ -52,7 +50,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.options.ValueProviders;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -60,7 +57,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -355,9 +351,6 @@ public class CreateTest {
p.run();
}
- private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
- ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-
/** Testing options for {@link #testCreateOfProvider()}. */
public interface CreateOfProviderOptions extends PipelineOptions {
ValueProvider<String> getFoo();
@@ -385,19 +378,12 @@ public class CreateTest {
}),
StringUtf8Coder.of())))
.containsInAnyOrder("foobar");
- CreateOfProviderOptions submitOptions =
- p.getOptions().as(CreateOfProviderOptions.class);
PAssert.that(
- p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
- .containsInAnyOrder("runtime foo");
-
- String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
- String runnerString = ValueProviders.updateSerializedOptions(
- serializedOptions, ImmutableMap.of("foo", "runtime foo"));
- CreateOfProviderOptions runtimeOptions =
- MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+ p.apply(
+ "Runtime", Create.ofProvider(p.newProvider("runtimeFoo"), StringUtf8Coder.of())))
+ .containsInAnyOrder("runtimeFoo");
- p.run(runtimeOptions);
+ p.run();
}
[2/2] beam git commit: This closes #3753: [BEAM-2644] Introduces
TestPipeline.newProvider()
Posted by jk...@apache.org.
This closes #3753: [BEAM-2644] Introduces TestPipeline.newProvider()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6c84053
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6c84053
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6c84053
Branch: refs/heads/master
Commit: f6c840533fbfce6e4aec87bbfc3d2ce813a7131d
Parents: 5c2cab0 f1b19b7
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 30 12:15:19 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 12:15:19 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/options/ValueProvider.java | 10 ++--
.../apache/beam/sdk/options/ValueProviders.java | 15 +++---
.../apache/beam/sdk/testing/TestPipeline.java | 49 +++++++++++++++++++-
.../sdk/transforms/display/DisplayData.java | 5 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++++--
.../sdk/options/ProxyInvocationHandlerTest.java | 4 +-
.../beam/sdk/options/ValueProviderTest.java | 23 +++++----
.../beam/sdk/testing/TestPipelineTest.java | 37 ++++++++++++++-
.../apache/beam/sdk/transforms/CreateTest.java | 22 ++-------
9 files changed, 127 insertions(+), 52 deletions(-)
----------------------------------------------------------------------