You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/27 22:24:04 UTC

[3/4] beam git commit: [BEAM-2640] Introduces Create.ofProvider(ValueProvider)

[BEAM-2640] Introduces Create.ofProvider(ValueProvider)

I also converted DatastoreV1 to use this overload, and, as an
exercise, added a withQuery(ValueProvider) overload to JdbcIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f515c22d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f515c22d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f515c22d

Branch: refs/heads/master
Commit: f515c22d6bd583cb97fb33c6537c1ecc6513141a
Parents: 5727ad2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:50:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/options/ValueProvider.java  |  6 +++
 .../apache/beam/sdk/options/ValueProviders.java |  2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  7 ++-
 .../org/apache/beam/sdk/transforms/Create.java  | 42 ++++++++++++++++
 .../apache/beam/sdk/transforms/CreateTest.java  | 52 ++++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 19 ++++---
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 13 +++--
 7 files changed, 126 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index c7f1e09..94187a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,13 +41,19 @@ import java.lang.reflect.Proxy;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * A {@link ValueProvider} abstracts the notion of fetching a value that may or may not be currently
  * available.
  *
  * <p>This can be used to parameterize transforms that only read values in at runtime, for example.
+ *
+ * <p>A common task is to create a {@link PCollection} containing the value of this
+ * {@link ValueProvider} regardless of whether it's accessible at construction time or not.
+ * For that, use {@link Create#ofProvider}.
  */
 @JsonSerialize(using = ValueProvider.Serializer.class)
 @JsonDeserialize(using = ValueProvider.Deserializer.class)

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 1cc46fe..bc479a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
 /**
  * Utilities for working with the {@link ValueProvider} interface.
  */
-class ValueProviders {
+public class ValueProviders {
   private ValueProviders() {}
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 34f1c83..b67b14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -328,6 +328,11 @@ public class TestPipeline extends Pipeline implements TestRule {
    * testing.
    */
   public PipelineResult run() {
+    return run(getOptions());
+  }
+
+  /** Like {@link #run} but with the given potentially modified options. */
+  public PipelineResult run(PipelineOptions options) {
     checkState(
         enforcement.isPresent(),
         "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
@@ -336,7 +341,7 @@ public class TestPipeline extends Pipeline implements TestRule {
     final PipelineResult pipelineResult;
     try {
       enforcement.get().beforePipelineExecution();
-      pipelineResult = super.run();
+      pipelineResult = super.run(options);
       verifyPAssertsSucceeded(this, pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7af8fb8..09e12ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.io.OffsetBasedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -200,6 +202,14 @@ public class Create<T> {
   }
 
   /**
+   * Returns an {@link OfValueProvider} transform that produces a {@link PCollection}
+   * of a single element provided by the given {@link ValueProvider}.
+   */
+  public static <T> OfValueProvider<T> ofProvider(ValueProvider<T> provider, Coder<T> coder) {
+    return new OfValueProvider<>(provider, coder);
+  }
+
+  /**
    * Returns a new {@link Create.TimestampedValues} transform that produces a
    * {@link PCollection} containing the elements of the provided {@code Iterable}
    * with the specified timestamps.
@@ -485,6 +495,38 @@ public class Create<T> {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Implementation of {@link #ofProvider}. */
+  public static class OfValueProvider<T> extends PTransform<PBegin, PCollection<T>> {
+    private final ValueProvider<T> provider;
+    private final Coder<T> coder;
+
+    private OfValueProvider(ValueProvider<T> provider, Coder<T> coder) {
+      this.provider = checkNotNull(provider, "provider");
+      this.coder = checkNotNull(coder, "coder");
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (provider.isAccessible()) {
+        Values<T> values = Create.of(provider.get());
+        return input.apply(values.withCoder(coder));
+      }
+      return input
+          .apply(Create.of((Void) null))
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<Void, T>() {
+                    @Override
+                    public T apply(Void input) {
+                      return provider.get();
+                    }
+                  }))
+          .setCoder(coder);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
   /**
    * A {@code PTransform} that creates a {@code PCollection} whose elements have
    * associated timestamps.

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a05d31c..1e7ce2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,7 +25,9 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.InputStream;
@@ -47,6 +49,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.options.ValueProviders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -54,6 +60,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -353,6 +360,51 @@ public class CreateTest {
     p.run();
   }
 
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+  public interface CreateOfProviderOptions extends PipelineOptions {
+    ValueProvider<String> getFoo();
+    void setFoo(ValueProvider<String> value);
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testCreateOfProvider() throws Exception {
+    PAssert.that(
+            p.apply(
+                "Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
+        .containsInAnyOrder("foo");
+    PAssert.that(
+            p.apply(
+                "Static nested",
+                Create.ofProvider(
+                    NestedValueProvider.of(
+                        StaticValueProvider.of("foo"),
+                        new SerializableFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return input + "bar";
+                          }
+                        }),
+                    StringUtf8Coder.of())))
+        .containsInAnyOrder("foobar");
+    CreateOfProviderOptions submitOptions =
+        p.getOptions().as(CreateOfProviderOptions.class);
+    PAssert.that(
+            p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
+        .containsInAnyOrder("runtime foo");
+
+    String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
+    String runnerString = ValueProviders.updateSerializedOptions(
+        serializedOptions, ImmutableMap.of("foo", "runtime foo"));
+    CreateOfProviderOptions runtimeOptions =
+        MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+
+    p.run(runtimeOptions);
+  }
+
+
   @Test
   public void testCreateGetName() {
     assertEquals("Create.Values", Create.of(1, 2, 3).getName());

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 1ed6430..7e40db4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -99,7 +99,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -611,10 +610,10 @@ public class DatastoreV1 {
       if (getQuery() != null) {
         inputQuery = input.apply(Create.of(getQuery()));
       } else {
-        inputQuery = input
-            .apply(Create.of(getLiteralGqlQuery())
-                .withCoder(SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {})))
-            .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+        inputQuery =
+            input
+                .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
+                .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
       }
 
       PCollection<KV<Integer, Query>> splitQueries = inputQuery
@@ -730,7 +729,7 @@ public class DatastoreV1 {
     /**
      * A DoFn that translates a Cloud Datastore gql query string to {@code Query}.
      */
-    static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, Query> {
+    static class GqlQueryTranslateFn extends DoFn<String, Query> {
       private final V1Options v1Options;
       private transient Datastore datastore;
       private final V1DatastoreFactory datastoreFactory;
@@ -751,9 +750,9 @@ public class DatastoreV1 {
 
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
-        ValueProvider<String> gqlQuery = c.element();
-        LOG.info("User query: '{}'", gqlQuery.get());
-        Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), datastore,
+        String gqlQuery = c.element();
+        LOG.info("User query: '{}'", gqlQuery);
+        Query query = translateGqlQueryWithLimitCheck(gqlQuery, datastore,
             v1Options.getNamespace());
         LOG.info("User gql query translated to Query({})", query);
         c.output(query);

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index bf73dbe..51f34ae 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -31,7 +31,9 @@ import javax.annotation.Nullable;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -272,7 +274,7 @@ public class JdbcIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
-    @Nullable abstract String getQuery();
+    @Nullable abstract ValueProvider<String> getQuery();
     @Nullable abstract StatementPreparator getStatementPreparator();
     @Nullable abstract RowMapper<T> getRowMapper();
     @Nullable abstract Coder<T> getCoder();
@@ -282,7 +284,7 @@ public class JdbcIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
-      abstract Builder<T> setQuery(String query);
+      abstract Builder<T> setQuery(ValueProvider<String> query);
       abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
       abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
       abstract Builder<T> setCoder(Coder<T> coder);
@@ -297,6 +299,11 @@ public class JdbcIO {
 
     public Read<T> withQuery(String query) {
       checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+      return withQuery(ValueProvider.StaticValueProvider.of(query));
+    }
+
+    public Read<T> withQuery(ValueProvider<String> query) {
+      checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
       return toBuilder().setQuery(query).build();
     }
 
@@ -321,7 +328,7 @@ public class JdbcIO {
     @Override
     public PCollection<T> expand(PBegin input) {
       return input
-          .apply(Create.of(getQuery()))
+          .apply(Create.ofProvider(getQuery(), StringUtf8Coder.of()))
           .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
           .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
             private Random random;