You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/27 22:24:02 UTC
[1/4] beam git commit: Adds AvroIO.read().withHintMatchesManyFiles()
Repository: beam
Updated Branches:
refs/heads/master 5727ad28f -> 68bb7c0f5
Adds AvroIO.read().withHintMatchesManyFiles()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3335773
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3335773
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3335773
Branch: refs/heads/master
Commit: e3335773018a5644eb07b0f9b2b0619fbab19ec5
Parents: 476efa6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 18:13:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 53 +++++++++++++++++---
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 13 ++++-
2 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e3335773/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index bc7fecb..018b84f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -84,6 +86,11 @@ import org.apache.beam.sdk.values.PDone;
* .from("gs://my_bucket/path/to/records-*.avro"));
* }</pre>
*
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small
+ * number of files.
+ *
* <p>Reading from a {@link PCollection} of filepatterns:
*
* <pre>{@code
@@ -147,6 +154,7 @@ public class AvroIO {
return new AutoValue_AvroIO_Read.Builder<T>()
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
+ .setHintMatchesManyFiles(false)
.build();
}
@@ -167,6 +175,7 @@ public class AvroIO {
return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
.setRecordClass(GenericRecord.class)
.setSchema(schema)
+ .setHintMatchesManyFiles(false)
.build();
}
@@ -240,6 +249,7 @@ public class AvroIO {
@Nullable abstract ValueProvider<String> getFilepattern();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
+ abstract boolean getHintMatchesManyFiles();
abstract Builder<T> toBuilder();
@@ -248,11 +258,17 @@ public class AvroIO {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
+ abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
abstract Read<T> build();
}
- /** Reads from the given filename or filepattern. */
+ /**
+ * Reads from the given filename or filepattern.
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (at least tens
+ * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+ */
public Read<T> from(ValueProvider<String> filepattern) {
return toBuilder().setFilepattern(filepattern).build();
}
@@ -262,16 +278,39 @@ public class AvroIO {
return from(StaticValueProvider.of(filepattern));
}
+ /**
+ * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+ * files.
+ *
+ * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+ * performance for this case, but it may worsen performance if the filepattern matches only a
+ * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+ * happen less efficiently within individual files).
+ */
+ public Read<T> withHintMatchesManyFiles() {
+ return toBuilder().setHintMatchesManyFiles(true).build();
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
checkNotNull(getSchema(), "schema");
- return input
- .getPipeline()
- .apply(
- "Read",
- org.apache.beam.sdk.io.Read.from(
- createSource(getFilepattern(), getRecordClass(), getSchema())));
+ if (getHintMatchesManyFiles()) {
+ ReadAll<T> readAll =
+ (getRecordClass() == GenericRecord.class)
+ ? (ReadAll<T>) readAllGenericRecords(getSchema())
+ : readAll(getRecordClass());
+ return input
+ .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply(readAll);
+ } else {
+ return input
+ .getPipeline()
+ .apply(
+ "Read",
+ org.apache.beam.sdk.io.Read.from(
+ createSource(getFilepattern(), getRecordClass(), getSchema())));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e3335773/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index df5d26c..90cd824 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -153,11 +153,20 @@ public class AvroIOTest {
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
writePipeline.run().waitUntilFinish();
- // Test both read() and readAll()
+ // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+ PAssert.that(
+ readPipeline.apply(
+ "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+ .containsInAnyOrder(values);
PAssert.that(
- readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+ readPipeline.apply(
+ "Read withHintMatchesManyFiles",
+ AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath())
+ .withHintMatchesManyFiles()))
.containsInAnyOrder(values);
PAssert.that(
+ "ReadAll",
readPipeline
.apply(Create.of(outputFile.getAbsolutePath()))
.apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
[4/4] beam git commit: This closes #3639: [BEAM-2640,
BEAM-2641] Introduces TextIO/AvroIO.read().withHintMatchesManyFiles()
Posted by jk...@apache.org.
This closes #3639: [BEAM-2640, BEAM-2641] Introduces TextIO/AvroIO.read().withHintMatchesManyFiles()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68bb7c0f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68bb7c0f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68bb7c0f
Branch: refs/heads/master
Commit: 68bb7c0f53eff719224e42ed10e1cfe7414d2770
Parents: 5727ad2 e333577
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 27 14:44:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:44:02 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 53 +++++++++++++++++---
.../java/org/apache/beam/sdk/io/TextIO.java | 46 +++++++++++++----
.../apache/beam/sdk/options/ValueProvider.java | 6 +++
.../apache/beam/sdk/options/ValueProviders.java | 2 +-
.../apache/beam/sdk/testing/TestPipeline.java | 7 ++-
.../org/apache/beam/sdk/transforms/Create.java | 42 ++++++++++++++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 13 ++++-
.../org/apache/beam/sdk/io/TextIOReadTest.java | 20 ++++++--
.../apache/beam/sdk/transforms/CreateTest.java | 53 ++++++++++++++++++++
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 19 ++++---
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 13 +++--
11 files changed, 236 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: [BEAM-2641] Introduces
TextIO.read().withHintMatchesManyFiles()
Posted by jk...@apache.org.
[BEAM-2641] Introduces TextIO.read().withHintMatchesManyFiles()
In that case it expands to TextIO.readAll().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/476efa65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/476efa65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/476efa65
Branch: refs/heads/master
Commit: 476efa65adc32da56c0ee4a3a85cddfb9e239b09
Parents: f515c22
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:51:33 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 46 +++++++++++++++-----
.../org/apache/beam/sdk/io/TextIOReadTest.java | 20 +++++++--
.../apache/beam/sdk/transforms/CreateTest.java | 1 +
3 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 73040da..765a842 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -34,11 +34,11 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
-import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -68,6 +68,11 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small
+ * number of files.
+ *
* <p>Example 2: reading a PCollection of filenames.
*
* <pre>{@code
@@ -143,7 +148,10 @@ public class TextIO {
* {@link PCollection} containing one element for each line of the input files.
*/
public static Read read() {
- return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
+ return new AutoValue_TextIO_Read.Builder()
+ .setCompressionType(CompressionType.AUTO)
+ .setHintMatchesManyFiles(false)
+ .build();
}
/**
@@ -206,6 +214,7 @@ public class TextIO {
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@Nullable abstract ValueProvider<String> getFilepattern();
abstract CompressionType getCompressionType();
+ abstract boolean getHintMatchesManyFiles();
abstract Builder toBuilder();
@@ -213,6 +222,7 @@ public class TextIO {
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Read build();
}
@@ -226,6 +236,9 @@ public class TextIO {
*
* <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (at least tens
+ * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
*/
public Read from(String filepattern) {
checkNotNull(filepattern, "Filepattern cannot be empty.");
@@ -248,17 +261,28 @@ public class TextIO {
return toBuilder().setCompressionType(compressionType).build();
}
+ /**
+ * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+ * files.
+ *
+ * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+ * performance for this case, but it may worsen performance if the filepattern matches only
+ * a small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+ * happen less efficiently within individual files).
+ */
+ public Read withHintMatchesManyFiles() {
+ return toBuilder().setHintMatchesManyFiles(true).build();
+ }
+
@Override
public PCollection<String> expand(PBegin input) {
- if (getFilepattern() == null) {
- throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
- }
-
- final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
- PCollection<String> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
+ checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+ return getHintMatchesManyFiles()
+ ? input
+ .apply(
+ "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply(readAll().withCompressionType(getCompressionType()))
+ : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
}
// Helper to create a source specific to the requested compression type.
http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index e733010..8ad6030 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -290,9 +290,16 @@ public class TextIOReadTest {
}
/**
- * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and
- * TextIO.readAll().withCompressionType(compressionType) applied to the single filename,
- * and asserts that the results match the given expected output.
+ * Helper method that runs a variety of ways to read a single file using TextIO
+ * and checks that they all match the given expected output.
+ *
+ * <p>The transforms being verified are:
+ * <ul>
+ * <li>TextIO.read().from(filename).withCompressionType(compressionType)
+ * <li>TextIO.read().from(filename).withCompressionType(compressionType)
+ * .withHintMatchesManyFiles()
+ * <li>TextIO.readAll().withCompressionType(compressionType)
+ * </ul> and
*/
private void assertReadingCompressedFileMatchesExpected(
File file, CompressionType compressionType, List<String> expected) {
@@ -300,10 +307,17 @@ public class TextIOReadTest {
int thisUniquifier = ++uniquifier;
TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType);
+
PAssert.that(
p.apply("Read_" + file + "_" + compressionType.toString() + "_" + thisUniquifier, read))
.containsInAnyOrder(expected);
+ PAssert.that(
+ p.apply(
+ "Read_" + file + "_" + compressionType.toString() + "_many" + "_" + thisUniquifier,
+ read.withHintMatchesManyFiles()))
+ .containsInAnyOrder(expected);
+
TextIO.ReadAll readAll =
TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10);
PAssert.that(
http://git-wip-us.apache.org/repos/asf/beam/blob/476efa65/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 1e7ce2d..6a682ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -363,6 +363,7 @@ public class CreateTest {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ /** Testing options for {@link #testCreateOfProvider()}. */
public interface CreateOfProviderOptions extends PipelineOptions {
ValueProvider<String> getFoo();
void setFoo(ValueProvider<String> value);
[3/4] beam git commit: [BEAM-2640] Introduces
Create.ofProvider(ValueProvider)
Posted by jk...@apache.org.
[BEAM-2640] Introduces Create.ofProvider(ValueProvider)
I also converted DatastoreV1 to use this overload, and, as an
exercise, added a withQuery(ValueProvider) overload to JdbcIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f515c22d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f515c22d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f515c22d
Branch: refs/heads/master
Commit: f515c22d6bd583cb97fb33c6537c1ecc6513141a
Parents: 5727ad2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:50:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/options/ValueProvider.java | 6 +++
.../apache/beam/sdk/options/ValueProviders.java | 2 +-
.../apache/beam/sdk/testing/TestPipeline.java | 7 ++-
.../org/apache/beam/sdk/transforms/Create.java | 42 ++++++++++++++++
.../apache/beam/sdk/transforms/CreateTest.java | 52 ++++++++++++++++++++
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 19 ++++---
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 13 +++--
7 files changed, 126 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index c7f1e09..94187a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,13 +41,19 @@ import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
/**
* A {@link ValueProvider} abstracts the notion of fetching a value that may or may not be currently
* available.
*
* <p>This can be used to parameterize transforms that only read values in at runtime, for example.
+ *
+ * <p>A common task is to create a {@link PCollection} containing the value of this
+ * {@link ValueProvider} regardless of whether it's accessible at construction time or not.
+ * For that, use {@link Create#ofProvider}.
*/
@JsonSerialize(using = ValueProvider.Serializer.class)
@JsonDeserialize(using = ValueProvider.Deserializer.class)
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 1cc46fe..bc479a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
* Utilities for working with the {@link ValueProvider} interface.
*/
-class ValueProviders {
+public class ValueProviders {
private ValueProviders() {}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 34f1c83..b67b14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -328,6 +328,11 @@ public class TestPipeline extends Pipeline implements TestRule {
* testing.
*/
public PipelineResult run() {
+ return run(getOptions());
+ }
+
+ /** Like {@link #run} but with the given potentially modified options. */
+ public PipelineResult run(PipelineOptions options) {
checkState(
enforcement.isPresent(),
"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
@@ -336,7 +341,7 @@ public class TestPipeline extends Pipeline implements TestRule {
final PipelineResult pipelineResult;
try {
enforcement.get().beforePipelineExecution();
- pipelineResult = super.run();
+ pipelineResult = super.run(options);
verifyPAssertsSucceeded(this, pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7af8fb8..09e12ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -200,6 +202,14 @@ public class Create<T> {
}
/**
+ * Returns an {@link OfValueProvider} transform that produces a {@link PCollection}
+ * of a single element provided by the given {@link ValueProvider}.
+ */
+ public static <T> OfValueProvider<T> ofProvider(ValueProvider<T> provider, Coder<T> coder) {
+ return new OfValueProvider<>(provider, coder);
+ }
+
+ /**
* Returns a new {@link Create.TimestampedValues} transform that produces a
* {@link PCollection} containing the elements of the provided {@code Iterable}
* with the specified timestamps.
@@ -485,6 +495,38 @@ public class Create<T> {
/////////////////////////////////////////////////////////////////////////////
+ /** Implementation of {@link #ofProvider}. */
+ public static class OfValueProvider<T> extends PTransform<PBegin, PCollection<T>> {
+ private final ValueProvider<T> provider;
+ private final Coder<T> coder;
+
+ private OfValueProvider(ValueProvider<T> provider, Coder<T> coder) {
+ this.provider = checkNotNull(provider, "provider");
+ this.coder = checkNotNull(coder, "coder");
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (provider.isAccessible()) {
+ Values<T> values = Create.of(provider.get());
+ return input.apply(values.withCoder(coder));
+ }
+ return input
+ .apply(Create.of((Void) null))
+ .apply(
+ MapElements.via(
+ new SimpleFunction<Void, T>() {
+ @Override
+ public T apply(Void input) {
+ return provider.get();
+ }
+ }))
+ .setCoder(coder);
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
/**
* A {@code PTransform} that creates a {@code PCollection} whose elements have
* associated timestamps.
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a05d31c..1e7ce2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,7 +25,9 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
@@ -47,6 +49,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.options.ValueProviders;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -54,6 +60,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -353,6 +360,51 @@ public class CreateTest {
p.run();
}
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+ public interface CreateOfProviderOptions extends PipelineOptions {
+ ValueProvider<String> getFoo();
+ void setFoo(ValueProvider<String> value);
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testCreateOfProvider() throws Exception {
+ PAssert.that(
+ p.apply(
+ "Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
+ .containsInAnyOrder("foo");
+ PAssert.that(
+ p.apply(
+ "Static nested",
+ Create.ofProvider(
+ NestedValueProvider.of(
+ StaticValueProvider.of("foo"),
+ new SerializableFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input + "bar";
+ }
+ }),
+ StringUtf8Coder.of())))
+ .containsInAnyOrder("foobar");
+ CreateOfProviderOptions submitOptions =
+ p.getOptions().as(CreateOfProviderOptions.class);
+ PAssert.that(
+ p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
+ .containsInAnyOrder("runtime foo");
+
+ String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
+ String runnerString = ValueProviders.updateSerializedOptions(
+ serializedOptions, ImmutableMap.of("foo", "runtime foo"));
+ CreateOfProviderOptions runtimeOptions =
+ MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+
+ p.run(runtimeOptions);
+ }
+
+
@Test
public void testCreateGetName() {
assertEquals("Create.Values", Create.of(1, 2, 3).getName());
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 1ed6430..7e40db4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -99,7 +99,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -611,10 +610,10 @@ public class DatastoreV1 {
if (getQuery() != null) {
inputQuery = input.apply(Create.of(getQuery()));
} else {
- inputQuery = input
- .apply(Create.of(getLiteralGqlQuery())
- .withCoder(SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {})))
- .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+ inputQuery =
+ input
+ .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
+ .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
}
PCollection<KV<Integer, Query>> splitQueries = inputQuery
@@ -730,7 +729,7 @@ public class DatastoreV1 {
/**
* A DoFn that translates a Cloud Datastore gql query string to {@code Query}.
*/
- static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, Query> {
+ static class GqlQueryTranslateFn extends DoFn<String, Query> {
private final V1Options v1Options;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
@@ -751,9 +750,9 @@ public class DatastoreV1 {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- ValueProvider<String> gqlQuery = c.element();
- LOG.info("User query: '{}'", gqlQuery.get());
- Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), datastore,
+ String gqlQuery = c.element();
+ LOG.info("User query: '{}'", gqlQuery);
+ Query query = translateGqlQueryWithLimitCheck(gqlQuery, datastore,
v1Options.getNamespace());
LOG.info("User gql query translated to Query({})", query);
c.output(query);
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index bf73dbe..51f34ae 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -31,7 +31,9 @@ import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -272,7 +274,7 @@ public class JdbcIO {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
- @Nullable abstract String getQuery();
+ @Nullable abstract ValueProvider<String> getQuery();
@Nullable abstract StatementPreparator getStatementPreparator();
@Nullable abstract RowMapper<T> getRowMapper();
@Nullable abstract Coder<T> getCoder();
@@ -282,7 +284,7 @@ public class JdbcIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
- abstract Builder<T> setQuery(String query);
+ abstract Builder<T> setQuery(ValueProvider<String> query);
abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -297,6 +299,11 @@ public class JdbcIO {
public Read<T> withQuery(String query) {
checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+ return withQuery(ValueProvider.StaticValueProvider.of(query));
+ }
+
+ public Read<T> withQuery(ValueProvider<String> query) {
+ checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
return toBuilder().setQuery(query).build();
}
@@ -321,7 +328,7 @@ public class JdbcIO {
@Override
public PCollection<T> expand(PBegin input) {
return input
- .apply(Create.of(getQuery()))
+ .apply(Create.ofProvider(getQuery(), StringUtf8Coder.of()))
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
.apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
private Random random;