You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/27 22:24:02 UTC

[1/4] beam git commit: Adds AvroIO.read().withHintMatchesManyFiles()

Repository: beam
Updated Branches:
  refs/heads/master 5727ad28f -> 68bb7c0f5


Adds AvroIO.read().withHintMatchesManyFiles()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3335773
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3335773
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3335773

Branch: refs/heads/master
Commit: e3335773018a5644eb07b0f9b2b0619fbab19ec5
Parents: 476efa6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 18:13:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 53 +++++++++++++++++---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 13 ++++-
 2 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e3335773/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index bc7fecb..018b84f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -84,6 +86,11 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * }</pre>
  *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small
+ * number of files.
+ *
  * <p>Reading from a {@link PCollection} of filepatterns:
  *
  * <pre>{@code
@@ -147,6 +154,7 @@ public class AvroIO {
     return new AutoValue_AvroIO_Read.Builder<T>()
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
+        .setHintMatchesManyFiles(false)
         .build();
   }
 
@@ -167,6 +175,7 @@ public class AvroIO {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
+        .setHintMatchesManyFiles(false)
         .build();
   }
 
@@ -240,6 +249,7 @@ public class AvroIO {
     @Nullable abstract ValueProvider<String> getFilepattern();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
+    abstract boolean getHintMatchesManyFiles();
 
     abstract Builder<T> toBuilder();
 
@@ -248,11 +258,17 @@ public class AvroIO {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
 
       abstract Read<T> build();
     }
 
-    /** Reads from the given filename or filepattern. */
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
     public Read<T> from(ValueProvider<String> filepattern) {
       return toBuilder().setFilepattern(filepattern).build();
     }
@@ -262,16 +278,39 @@ public class AvroIO {
       return from(StaticValueProvider.of(filepattern));
     }
 
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
-      return input
-          .getPipeline()
-          .apply(
-              "Read",
-              org.apache.beam.sdk.io.Read.from(
-                  createSource(getFilepattern(), getRecordClass(), getSchema())));
+      if (getHintMatchesManyFiles()) {
+        ReadAll<T> readAll =
+            (getRecordClass() == GenericRecord.class)
+                ? (ReadAll<T>) readAllGenericRecords(getSchema())
+                : readAll(getRecordClass());
+        return input
+            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+            .apply(readAll);
+      } else {
+        return input
+            .getPipeline()
+            .apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(getFilepattern(), getRecordClass(), getSchema())));
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e3335773/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index df5d26c..90cd824 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -153,11 +153,20 @@ public class AvroIOTest {
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    // Test both read() and readAll()
+    // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+    PAssert.that(
+            readPipeline.apply(
+                "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(values);
     PAssert.that(
-            readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+            readPipeline.apply(
+                "Read withHintMatchesManyFiles",
+                AvroIO.read(GenericClass.class)
+                    .from(outputFile.getAbsolutePath())
+                    .withHintMatchesManyFiles()))
         .containsInAnyOrder(values);
     PAssert.that(
+            "ReadAll",
             readPipeline
                 .apply(Create.of(outputFile.getAbsolutePath()))
                 .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))


[4/4] beam git commit: This closes #3639: [BEAM-2640, BEAM-2641] Introduces TextIO/AvroIO.read().withHintMatchesManyFiles()

Posted by jk...@apache.org.
This closes #3639: [BEAM-2640, BEAM-2641] Introduces TextIO/AvroIO.read().withHintMatchesManyFiles()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68bb7c0f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68bb7c0f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68bb7c0f

Branch: refs/heads/master
Commit: 68bb7c0f53eff719224e42ed10e1cfe7414d2770
Parents: 5727ad2 e333577
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 27 14:44:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:44:02 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 53 +++++++++++++++++---
 .../java/org/apache/beam/sdk/io/TextIO.java     | 46 +++++++++++++----
 .../apache/beam/sdk/options/ValueProvider.java  |  6 +++
 .../apache/beam/sdk/options/ValueProviders.java |  2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  7 ++-
 .../org/apache/beam/sdk/transforms/Create.java  | 42 ++++++++++++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 13 ++++-
 .../org/apache/beam/sdk/io/TextIOReadTest.java  | 20 ++++++--
 .../apache/beam/sdk/transforms/CreateTest.java  | 53 ++++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 19 ++++---
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 13 +++--
 11 files changed, 236 insertions(+), 38 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: [BEAM-2641] Introduces TextIO.read().withHintMatchesManyFiles()

Posted by jk...@apache.org.
[BEAM-2641] Introduces TextIO.read().withHintMatchesManyFiles()

In that case it expands to TextIO.readAll().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/476efa65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/476efa65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/476efa65

Branch: refs/heads/master
Commit: 476efa65adc32da56c0ee4a3a85cddfb9e239b09
Parents: f515c22
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:51:33 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 46 +++++++++++++++-----
 .../org/apache/beam/sdk/io/TextIOReadTest.java  | 20 +++++++--
 .../apache/beam/sdk/transforms/CreateTest.java  |  1 +
 3 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 73040da..765a842 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -34,11 +34,11 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
-import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -68,6 +68,11 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small
+ * number of files.
+ *
  * <p>Example 2: reading a PCollection of filenames.
  *
  * <pre>{@code
@@ -143,7 +148,10 @@ public class TextIO {
    * {@link PCollection} containing one element for each line of the input files.
    */
   public static Read read() {
-    return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
+    return new AutoValue_TextIO_Read.Builder()
+        .setCompressionType(CompressionType.AUTO)
+        .setHintMatchesManyFiles(false)
+        .build();
   }
 
   /**
@@ -206,6 +214,7 @@ public class TextIO {
   public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract CompressionType getCompressionType();
+    abstract boolean getHintMatchesManyFiles();
 
     abstract Builder toBuilder();
 
@@ -213,6 +222,7 @@ public class TextIO {
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
 
       abstract Read build();
     }
@@ -226,6 +236,9 @@ public class TextIO {
      *
      * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
      * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
      */
     public Read from(String filepattern) {
       checkNotNull(filepattern, "Filepattern cannot be empty.");
@@ -248,17 +261,28 @@ public class TextIO {
       return toBuilder().setCompressionType(compressionType).build();
     }
 
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only
+     * a small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
     @Override
     public PCollection<String> expand(PBegin input) {
-      if (getFilepattern() == null) {
-        throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
-      }
-
-      final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
-      PCollection<String> pcol = input.getPipeline().apply("Read", read);
-      // Honor the default output coder that would have been used by this PTransform.
-      pcol.setCoder(getDefaultOutputCoder());
-      return pcol;
+      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+      return getHintMatchesManyFiles()
+          ? input
+              .apply(
+                  "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+              .apply(readAll().withCompressionType(getCompressionType()))
+          : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
     }
 
     // Helper to create a source specific to the requested compression type.

http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index e733010..8ad6030 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -290,9 +290,16 @@ public class TextIOReadTest {
   }
 
   /**
-   * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and
-   * TextIO.readAll().withCompressionType(compressionType) applied to the single filename,
-   * and asserts that the results match the given expected output.
+   * Helper method that runs a variety of ways to read a single file using TextIO
+   * and checks that they all match the given expected output.
+   *
+   * <p>The transforms being verified are:
+   * <ul>
+   *   <li>TextIO.read().from(filename).withCompressionType(compressionType)
+   *   <li>TextIO.read().from(filename).withCompressionType(compressionType)
+   *     .withHintMatchesManyFiles()
+   *   <li>TextIO.readAll().withCompressionType(compressionType)
+   * </ul> and
    */
   private void assertReadingCompressedFileMatchesExpected(
       File file, CompressionType compressionType, List<String> expected) {
@@ -300,10 +307,17 @@ public class TextIOReadTest {
     int thisUniquifier = ++uniquifier;
 
     TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType);
+
     PAssert.that(
             p.apply("Read_" + file + "_" + compressionType.toString() + "_" + thisUniquifier, read))
         .containsInAnyOrder(expected);
 
+    PAssert.that(
+            p.apply(
+                "Read_" + file + "_" + compressionType.toString() + "_many" + "_" + thisUniquifier,
+                read.withHintMatchesManyFiles()))
+        .containsInAnyOrder(expected);
+
     TextIO.ReadAll readAll =
         TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10);
     PAssert.that(

http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 1e7ce2d..6a682ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -363,6 +363,7 @@ public class CreateTest {
   private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
       ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
+  /** Testing options for {@link #testCreateOfProvider()}. */
   public interface CreateOfProviderOptions extends PipelineOptions {
     ValueProvider<String> getFoo();
     void setFoo(ValueProvider<String> value);


[3/4] beam git commit: [BEAM-2640] Introduces Create.ofProvider(ValueProvider)

Posted by jk...@apache.org.
[BEAM-2640] Introduces Create.ofProvider(ValueProvider)

I also converted DatastoreV1 to use this overload, and, as an
exercise, added a withQuery(ValueProvider) overload to JdbcIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f515c22d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f515c22d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f515c22d

Branch: refs/heads/master
Commit: f515c22d6bd583cb97fb33c6537c1ecc6513141a
Parents: 5727ad2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:50:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/options/ValueProvider.java  |  6 +++
 .../apache/beam/sdk/options/ValueProviders.java |  2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  7 ++-
 .../org/apache/beam/sdk/transforms/Create.java  | 42 ++++++++++++++++
 .../apache/beam/sdk/transforms/CreateTest.java  | 52 ++++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 19 ++++---
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 13 +++--
 7 files changed, 126 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index c7f1e09..94187a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,13 +41,19 @@ import java.lang.reflect.Proxy;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * A {@link ValueProvider} abstracts the notion of fetching a value that may or may not be currently
  * available.
  *
  * <p>This can be used to parameterize transforms that only read values in at runtime, for example.
+ *
+ * <p>A common task is to create a {@link PCollection} containing the value of this
+ * {@link ValueProvider} regardless of whether it's accessible at construction time or not.
+ * For that, use {@link Create#ofProvider}.
  */
 @JsonSerialize(using = ValueProvider.Serializer.class)
 @JsonDeserialize(using = ValueProvider.Deserializer.class)

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 1cc46fe..bc479a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
 /**
  * Utilities for working with the {@link ValueProvider} interface.
  */
-class ValueProviders {
+public class ValueProviders {
   private ValueProviders() {}
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 34f1c83..b67b14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -328,6 +328,11 @@ public class TestPipeline extends Pipeline implements TestRule {
    * testing.
    */
   public PipelineResult run() {
+    return run(getOptions());
+  }
+
+  /** Like {@link #run} but with the given potentially modified options. */
+  public PipelineResult run(PipelineOptions options) {
     checkState(
         enforcement.isPresent(),
         "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
@@ -336,7 +341,7 @@ public class TestPipeline extends Pipeline implements TestRule {
     final PipelineResult pipelineResult;
     try {
       enforcement.get().beforePipelineExecution();
-      pipelineResult = super.run();
+      pipelineResult = super.run(options);
       verifyPAssertsSucceeded(this, pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7af8fb8..09e12ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.io.OffsetBasedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -200,6 +202,14 @@ public class Create<T> {
   }
 
   /**
+   * Returns an {@link OfValueProvider} transform that produces a {@link PCollection}
+   * of a single element provided by the given {@link ValueProvider}.
+   */
+  public static <T> OfValueProvider<T> ofProvider(ValueProvider<T> provider, Coder<T> coder) {
+    return new OfValueProvider<>(provider, coder);
+  }
+
+  /**
    * Returns a new {@link Create.TimestampedValues} transform that produces a
    * {@link PCollection} containing the elements of the provided {@code Iterable}
    * with the specified timestamps.
@@ -485,6 +495,38 @@ public class Create<T> {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Implementation of {@link #ofProvider}. */
+  public static class OfValueProvider<T> extends PTransform<PBegin, PCollection<T>> {
+    private final ValueProvider<T> provider;
+    private final Coder<T> coder;
+
+    private OfValueProvider(ValueProvider<T> provider, Coder<T> coder) {
+      this.provider = checkNotNull(provider, "provider");
+      this.coder = checkNotNull(coder, "coder");
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (provider.isAccessible()) {
+        Values<T> values = Create.of(provider.get());
+        return input.apply(values.withCoder(coder));
+      }
+      return input
+          .apply(Create.of((Void) null))
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<Void, T>() {
+                    @Override
+                    public T apply(Void input) {
+                      return provider.get();
+                    }
+                  }))
+          .setCoder(coder);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
   /**
    * A {@code PTransform} that creates a {@code PCollection} whose elements have
    * associated timestamps.

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a05d31c..1e7ce2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,7 +25,9 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.InputStream;
@@ -47,6 +49,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.options.ValueProviders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -54,6 +60,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -353,6 +360,51 @@ public class CreateTest {
     p.run();
   }
 
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+  public interface CreateOfProviderOptions extends PipelineOptions {
+    ValueProvider<String> getFoo();
+    void setFoo(ValueProvider<String> value);
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testCreateOfProvider() throws Exception {
+    PAssert.that(
+            p.apply(
+                "Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
+        .containsInAnyOrder("foo");
+    PAssert.that(
+            p.apply(
+                "Static nested",
+                Create.ofProvider(
+                    NestedValueProvider.of(
+                        StaticValueProvider.of("foo"),
+                        new SerializableFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return input + "bar";
+                          }
+                        }),
+                    StringUtf8Coder.of())))
+        .containsInAnyOrder("foobar");
+    CreateOfProviderOptions submitOptions =
+        p.getOptions().as(CreateOfProviderOptions.class);
+    PAssert.that(
+            p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
+        .containsInAnyOrder("runtime foo");
+
+    String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
+    String runnerString = ValueProviders.updateSerializedOptions(
+        serializedOptions, ImmutableMap.of("foo", "runtime foo"));
+    CreateOfProviderOptions runtimeOptions =
+        MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+
+    p.run(runtimeOptions);
+  }
+
+
   @Test
   public void testCreateGetName() {
     assertEquals("Create.Values", Create.of(1, 2, 3).getName());

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 1ed6430..7e40db4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -99,7 +99,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -611,10 +610,10 @@ public class DatastoreV1 {
       if (getQuery() != null) {
         inputQuery = input.apply(Create.of(getQuery()));
       } else {
-        inputQuery = input
-            .apply(Create.of(getLiteralGqlQuery())
-                .withCoder(SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {})))
-            .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+        inputQuery =
+            input
+                .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
+                .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
       }
 
       PCollection<KV<Integer, Query>> splitQueries = inputQuery
@@ -730,7 +729,7 @@ public class DatastoreV1 {
     /**
      * A DoFn that translates a Cloud Datastore gql query string to {@code Query}.
      */
-    static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, Query> {
+    static class GqlQueryTranslateFn extends DoFn<String, Query> {
       private final V1Options v1Options;
       private transient Datastore datastore;
       private final V1DatastoreFactory datastoreFactory;
@@ -751,9 +750,9 @@ public class DatastoreV1 {
 
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
-        ValueProvider<String> gqlQuery = c.element();
-        LOG.info("User query: '{}'", gqlQuery.get());
-        Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), datastore,
+        String gqlQuery = c.element();
+        LOG.info("User query: '{}'", gqlQuery);
+        Query query = translateGqlQueryWithLimitCheck(gqlQuery, datastore,
             v1Options.getNamespace());
         LOG.info("User gql query translated to Query({})", query);
         c.output(query);

http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index bf73dbe..51f34ae 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -31,7 +31,9 @@ import javax.annotation.Nullable;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -272,7 +274,7 @@ public class JdbcIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
-    @Nullable abstract String getQuery();
+    @Nullable abstract ValueProvider<String> getQuery();
     @Nullable abstract StatementPreparator getStatementPreparator();
     @Nullable abstract RowMapper<T> getRowMapper();
     @Nullable abstract Coder<T> getCoder();
@@ -282,7 +284,7 @@ public class JdbcIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
-      abstract Builder<T> setQuery(String query);
+      abstract Builder<T> setQuery(ValueProvider<String> query);
       abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
       abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
       abstract Builder<T> setCoder(Coder<T> coder);
@@ -297,6 +299,11 @@ public class JdbcIO {
 
     public Read<T> withQuery(String query) {
       checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+      return withQuery(ValueProvider.StaticValueProvider.of(query));
+    }
+
+    public Read<T> withQuery(ValueProvider<String> query) {
+      checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
       return toBuilder().setQuery(query).build();
     }
 
@@ -321,7 +328,7 @@ public class JdbcIO {
     @Override
     public PCollection<T> expand(PBegin input) {
       return input
-          .apply(Create.of(getQuery()))
+          .apply(Create.ofProvider(getQuery(), StringUtf8Coder.of()))
           .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
           .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
             private Random random;