You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:32:50 UTC

[1/2] beam git commit: [BEAM-2644] Introduces TestPipeline.newProvider()

Repository: beam
Updated Branches:
  refs/heads/master 5c2cab017 -> f6c840533


[BEAM-2644] Introduces TestPipeline.newProvider()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b19b71
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b19b71
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b19b71

Branch: refs/heads/master
Commit: f1b19b71d2905079a4640d9fb89e02985ca6e873
Parents: 5c2cab0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Aug 23 19:13:46 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 12:14:34 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/options/ValueProvider.java  | 10 ++--
 .../apache/beam/sdk/options/ValueProviders.java | 15 +++---
 .../apache/beam/sdk/testing/TestPipeline.java   | 49 +++++++++++++++++++-
 .../sdk/transforms/display/DisplayData.java     |  5 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++++--
 .../sdk/options/ProxyInvocationHandlerTest.java |  4 +-
 .../beam/sdk/options/ValueProviderTest.java     | 23 +++++----
 .../beam/sdk/testing/TestPipelineTest.java      | 37 ++++++++++++++-
 .../apache/beam/sdk/transforms/CreateTest.java  | 22 ++-------
 9 files changed, 127 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 15413e8..3e6a24b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,6 +41,7 @@ import java.lang.reflect.Proxy;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
@@ -54,18 +55,21 @@ import org.apache.beam.sdk.values.PCollection;
  * <p>A common task is to create a {@link PCollection} containing the value of this
  * {@link ValueProvider} regardless of whether it's accessible at construction time or not.
  * For that, use {@link Create#ofProvider}.
+ *
+ * <p>For unit-testing a transform against a {@link ValueProvider} that only becomes available
+ * at runtime, use {@link TestPipeline#newProvider}.
  */
 @JsonSerialize(using = ValueProvider.Serializer.class)
 @JsonDeserialize(using = ValueProvider.Deserializer.class)
 public interface ValueProvider<T> extends Serializable {
   /**
-   * Return the value wrapped by this {@link ValueProvider}.
+   * Returns the runtime value wrapped by this {@link ValueProvider} in case it is {@link
+   * #isAccessible}, otherwise fails.
    */
   T get();
 
   /**
-   * Whether the contents of this {@link ValueProvider} is available to
-   * routines that run at graph construction time.
+   * Whether the contents of this {@link ValueProvider} is currently available via {@link #get}.
    */
   boolean isAccessible();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 2fffffa..9345462 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -22,17 +22,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.beam.sdk.testing.TestPipeline;
 
-/**
- * Utilities for working with the {@link ValueProvider} interface.
- */
+/** Utilities for working with the {@link ValueProvider} interface. */
 public class ValueProviders {
   private ValueProviders() {}
 
   /**
-   * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates
-   * the values according to the provided values in {@code runtimeValues}.
+   * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates the
+   * values according to the provided values in {@code runtimeValues}.
+   *
+   * @deprecated Use {@link TestPipeline#newProvider} for testing {@link ValueProvider} code.
    */
+  @Deprecated
   public static String updateSerializedOptions(
       String serializedOptions, Map<String, String> runtimeValues) {
     ObjectNode root, options;
@@ -41,8 +43,7 @@ public class ValueProviders {
       options = (ObjectNode) root.get("options");
       checkNotNull(options, "Unable to locate 'options' in %s", serializedOptions);
     } catch (IOException e) {
-      throw new RuntimeException(
-        String.format("Unable to parse %s", serializedOptions), e);
+      throw new RuntimeException(String.format("Unable to parse %s", serializedOptions), e);
     }
 
     for (Map.Entry<String, String> entry : runtimeValues.entrySet()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index b67b14f..be2f193 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -31,15 +31,19 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.UUID;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricResult;
@@ -49,7 +53,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
@@ -341,7 +348,12 @@ public class TestPipeline extends Pipeline implements TestRule {
     final PipelineResult pipelineResult;
     try {
       enforcement.get().beforePipelineExecution();
-      pipelineResult = super.run(options);
+      PipelineOptions updatedOptions =
+          MAPPER.convertValue(MAPPER.valueToTree(options), PipelineOptions.class);
+      updatedOptions
+          .as(TestValueProviderOptions.class)
+          .setProviderRuntimeValues(StaticValueProvider.of(providerRuntimeValues));
+      pipelineResult = super.run(updatedOptions);
       verifyPAssertsSucceeded(this, pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
@@ -358,6 +370,41 @@ public class TestPipeline extends Pipeline implements TestRule {
     return pipelineResult;
   }
 
+  /** Implementation detail of {@link #newProvider}, do not use. */
+  @Internal
+  public interface TestValueProviderOptions extends PipelineOptions {
+    ValueProvider<Map<String, Object>> getProviderRuntimeValues();
+    void setProviderRuntimeValues(ValueProvider<Map<String, Object>> runtimeValues);
+  }
+
+  /**
+   * Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be
+   * accessible while the pipeline runs.
+   */
+  public <T> ValueProvider<T> newProvider(T runtimeValue) {
+    String uuid = UUID.randomUUID().toString();
+    providerRuntimeValues.put(uuid, runtimeValue);
+    return ValueProvider.NestedValueProvider.of(
+        options.as(TestValueProviderOptions.class).getProviderRuntimeValues(),
+        new GetFromRuntimeValues<T>(uuid));
+  }
+
+  private final Map<String, Object> providerRuntimeValues = Maps.newHashMap();
+
+  private static class GetFromRuntimeValues<T>
+      implements SerializableFunction<Map<String, Object>, T> {
+    private final String key;
+
+    private GetFromRuntimeValues(String key) {
+      this.key = key;
+    }
+
+    @Override
+    public T apply(Map<String, Object> input) {
+      return (T) input.get(key);
+    }
+  }
+
   /**
    * Enables the abandoned node detection. Abandoned nodes are <code>PTransforms</code>, <code>
    * PAsserts</code> included, that were not executed by the pipeline runner. Abandoned nodes are

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 10ef428..917c070 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -796,8 +796,9 @@ public class DisplayData implements Serializable {
         // Don't re-wrap exceptions recursively.
         throw e;
       } catch (Throwable e) {
-        String msg = String.format("Error while populating display data for component: %s",
-            namespace.getName());
+        String msg = String.format(
+            "Error while populating display data for component '%s': %s",
+            namespace.getName(), e.getMessage());
         throw new PopulateDisplayDataException(msg, e);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index f49443d..8870dd8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -65,7 +65,6 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -226,14 +225,19 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath());
-
     writePipeline
         .apply(Create.of(values))
-        .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding());
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                .withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider)))
+    PAssert.that(
+            readPipeline.apply(
+                "Read",
+                AvroIO.read(GenericClass.class)
+                    .from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
         .containsInAnyOrder(values);
 
     readPipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index fb0a0d7..fe8a0f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -24,7 +24,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -62,7 +61,6 @@ import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TestRule;
@@ -797,7 +795,7 @@ public class ProxyInvocationHandlerTest {
 
     expectedException.expectMessage(
         ProxyInvocationHandler.PipelineOptionsDisplayData.class.getName());
-    expectedException.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+    expectedException.expectMessage("oh noes!!");
     p.run();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index 7bbbf7e..51a92e3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -194,16 +194,16 @@ public class ValueProviderTest {
     StaticValueProvider<String> provider = options.getBar();
   }
 
+
   @Test
   public void testSerializeDeserializeNoArg() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
     assertFalse(submitOptions.getFoo().isAccessible());
-    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
 
-    String runnerString = ValueProviders.updateSerializedOptions(
-      serializedOptions, ImmutableMap.of("foo", "quux"));
-    TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
-      .as(TestOptions.class);
+    ObjectNode root = MAPPER.valueToTree(submitOptions);
+    ((ObjectNode) root.get("options")).put("foo", "quux");
+    TestOptions runtime =
+        MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class);
 
     ValueProvider<String> vp = runtime.getFoo();
     assertTrue(vp.isAccessible());
@@ -214,14 +214,13 @@ public class ValueProviderTest {
   @Test
   public void testSerializeDeserializeWithArg() throws Exception {
     TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class);
-    assertEquals("baz", submitOptions.getFoo().get());
     assertTrue(submitOptions.getFoo().isAccessible());
-    String serializedOptions = MAPPER.writeValueAsString(submitOptions);
+    assertEquals("baz", submitOptions.getFoo().get());
 
-    String runnerString = ValueProviders.updateSerializedOptions(
-      serializedOptions, ImmutableMap.of("foo", "quux"));
-    TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class)
-      .as(TestOptions.class);
+    ObjectNode root = MAPPER.valueToTree(submitOptions);
+    ((ObjectNode) root.get("options")).put("foo", "quux");
+    TestOptions runtime =
+        MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class);
 
     ValueProvider<String> vp = runtime.getFoo();
     assertTrue(vp.isAccessible());

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 664f2f4..ec681ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.testing;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
@@ -37,8 +38,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
@@ -59,7 +62,8 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
   TestPipelineTest.TestPipelineCreationTest.class,
   TestPipelineTest.TestPipelineEnforcementsTest.WithRealPipelineRunner.class,
-  TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class
+  TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class,
+  TestPipelineTest.NewProviderTest.class
 })
 public class TestPipelineTest implements Serializable {
   private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
@@ -337,4 +341,35 @@ public class TestPipelineTest implements Serializable {
       }
     }
   }
+
+  /** Tests for {@link TestPipeline#newProvider}. */
+  @RunWith(JUnit4.class)
+  public static class NewProviderTest implements Serializable {
+    @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testNewProvider() {
+      ValueProvider<String> foo = pipeline.newProvider("foo");
+      ValueProvider<String> foobar =
+          ValueProvider.NestedValueProvider.of(
+              foo,
+              new SerializableFunction<String, String>() {
+                @Override
+                public String apply(String input) {
+                  return input + "bar";
+                }
+              });
+
+      assertFalse(foo.isAccessible());
+      assertFalse(foobar.isAccessible());
+
+      PAssert.that(pipeline.apply("create foo", Create.ofProvider(foo, StringUtf8Coder.of())))
+          .containsInAnyOrder("foo");
+      PAssert.that(pipeline.apply("create foobar", Create.ofProvider(foobar, StringUtf8Coder.of())))
+          .containsInAnyOrder("foobar");
+
+      pipeline.run();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 81ad947..1c7e1af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,9 +25,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.InputStream;
@@ -52,7 +50,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.options.ValueProviders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -60,7 +57,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -355,9 +351,6 @@ public class CreateTest {
     p.run();
   }
 
-  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
-      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-
   /** Testing options for {@link #testCreateOfProvider()}. */
   public interface CreateOfProviderOptions extends PipelineOptions {
     ValueProvider<String> getFoo();
@@ -385,19 +378,12 @@ public class CreateTest {
                         }),
                     StringUtf8Coder.of())))
         .containsInAnyOrder("foobar");
-    CreateOfProviderOptions submitOptions =
-        p.getOptions().as(CreateOfProviderOptions.class);
     PAssert.that(
-            p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
-        .containsInAnyOrder("runtime foo");
-
-    String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
-    String runnerString = ValueProviders.updateSerializedOptions(
-        serializedOptions, ImmutableMap.of("foo", "runtime foo"));
-    CreateOfProviderOptions runtimeOptions =
-        MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+            p.apply(
+                "Runtime", Create.ofProvider(p.newProvider("runtimeFoo"), StringUtf8Coder.of())))
+        .containsInAnyOrder("runtimeFoo");
 
-    p.run(runtimeOptions);
+    p.run();
   }
 
 


[2/2] beam git commit: This closes #3753: [BEAM-2644] Introduces TestPipeline.newProvider()

Posted by jk...@apache.org.
This closes #3753: [BEAM-2644] Introduces TestPipeline.newProvider()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6c84053
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6c84053
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6c84053

Branch: refs/heads/master
Commit: f6c840533fbfce6e4aec87bbfc3d2ce813a7131d
Parents: 5c2cab0 f1b19b7
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 30 12:15:19 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 12:15:19 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/options/ValueProvider.java  | 10 ++--
 .../apache/beam/sdk/options/ValueProviders.java | 15 +++---
 .../apache/beam/sdk/testing/TestPipeline.java   | 49 +++++++++++++++++++-
 .../sdk/transforms/display/DisplayData.java     |  5 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++++--
 .../sdk/options/ProxyInvocationHandlerTest.java |  4 +-
 .../beam/sdk/options/ValueProviderTest.java     | 23 +++++----
 .../beam/sdk/testing/TestPipelineTest.java      | 37 ++++++++++++++-
 .../apache/beam/sdk/transforms/CreateTest.java  | 22 ++-------
 9 files changed, 127 insertions(+), 52 deletions(-)
----------------------------------------------------------------------