You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/27 22:24:04 UTC
[3/4] beam git commit: [BEAM-2640] Introduces
Create.ofProvider(ValueProvider)
[BEAM-2640] Introduces Create.ofProvider(ValueProvider)
I also converted DatastoreV1 to use this overload, and, as an
exercise, added a withQuery(ValueProvider) overload to JdbcIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f515c22d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f515c22d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f515c22d
Branch: refs/heads/master
Commit: f515c22d6bd583cb97fb33c6537c1ecc6513141a
Parents: 5727ad2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 19 11:50:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 27 14:43:44 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/options/ValueProvider.java | 6 +++
.../apache/beam/sdk/options/ValueProviders.java | 2 +-
.../apache/beam/sdk/testing/TestPipeline.java | 7 ++-
.../org/apache/beam/sdk/transforms/Create.java | 42 ++++++++++++++++
.../apache/beam/sdk/transforms/CreateTest.java | 52 ++++++++++++++++++++
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 19 ++++---
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 13 +++--
7 files changed, 126 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index c7f1e09..94187a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -41,13 +41,19 @@ import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
/**
* A {@link ValueProvider} abstracts the notion of fetching a value that may or may not be currently
* available.
*
* <p>This can be used to parameterize transforms that only read values in at runtime, for example.
+ *
+ * <p>A common task is to create a {@link PCollection} containing the value of this
+ * {@link ValueProvider} regardless of whether it's accessible at construction time or not.
+ * For that, use {@link Create#ofProvider}.
*/
@JsonSerialize(using = ValueProvider.Serializer.class)
@JsonDeserialize(using = ValueProvider.Deserializer.class)
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index 1cc46fe..bc479a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
* Utilities for working with the {@link ValueProvider} interface.
*/
-class ValueProviders {
+public class ValueProviders {
private ValueProviders() {}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 34f1c83..b67b14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -328,6 +328,11 @@ public class TestPipeline extends Pipeline implements TestRule {
* testing.
*/
public PipelineResult run() {
+ return run(getOptions());
+ }
+
+ /** Like {@link #run} but with the given potentially modified options. */
+ public PipelineResult run(PipelineOptions options) {
checkState(
enforcement.isPresent(),
"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
@@ -336,7 +341,7 @@ public class TestPipeline extends Pipeline implements TestRule {
final PipelineResult pipelineResult;
try {
enforcement.get().beforePipelineExecution();
- pipelineResult = super.run();
+ pipelineResult = super.run(options);
verifyPAssertsSucceeded(this, pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 7af8fb8..09e12ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -200,6 +202,14 @@ public class Create<T> {
}
/**
+ * Returns an {@link OfValueProvider} transform that produces a {@link PCollection}
+ * of a single element provided by the given {@link ValueProvider}.
+ */
+ public static <T> OfValueProvider<T> ofProvider(ValueProvider<T> provider, Coder<T> coder) {
+ return new OfValueProvider<>(provider, coder);
+ }
+
+ /**
* Returns a new {@link Create.TimestampedValues} transform that produces a
* {@link PCollection} containing the elements of the provided {@code Iterable}
* with the specified timestamps.
@@ -485,6 +495,38 @@ public class Create<T> {
/////////////////////////////////////////////////////////////////////////////
+ /** Implementation of {@link #ofProvider}. */
+ public static class OfValueProvider<T> extends PTransform<PBegin, PCollection<T>> {
+ private final ValueProvider<T> provider;
+ private final Coder<T> coder;
+
+ private OfValueProvider(ValueProvider<T> provider, Coder<T> coder) {
+ this.provider = checkNotNull(provider, "provider");
+ this.coder = checkNotNull(coder, "coder");
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (provider.isAccessible()) {
+ Values<T> values = Create.of(provider.get());
+ return input.apply(values.withCoder(coder));
+ }
+ return input
+ .apply(Create.of((Void) null))
+ .apply(
+ MapElements.via(
+ new SimpleFunction<Void, T>() {
+ @Override
+ public T apply(Void input) {
+ return provider.get();
+ }
+ }))
+ .setCoder(coder);
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
/**
* A {@code PTransform} that creates a {@code PCollection} whose elements have
* associated timestamps.
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a05d31c..1e7ce2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -25,7 +25,9 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
@@ -47,6 +49,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.options.ValueProviders;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -54,6 +60,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -353,6 +360,51 @@ public class CreateTest {
p.run();
}
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+ public interface CreateOfProviderOptions extends PipelineOptions {
+ ValueProvider<String> getFoo();
+ void setFoo(ValueProvider<String> value);
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testCreateOfProvider() throws Exception {
+ PAssert.that(
+ p.apply(
+ "Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
+ .containsInAnyOrder("foo");
+ PAssert.that(
+ p.apply(
+ "Static nested",
+ Create.ofProvider(
+ NestedValueProvider.of(
+ StaticValueProvider.of("foo"),
+ new SerializableFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input + "bar";
+ }
+ }),
+ StringUtf8Coder.of())))
+ .containsInAnyOrder("foobar");
+ CreateOfProviderOptions submitOptions =
+ p.getOptions().as(CreateOfProviderOptions.class);
+ PAssert.that(
+ p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of())))
+ .containsInAnyOrder("runtime foo");
+
+ String serializedOptions = MAPPER.writeValueAsString(p.getOptions());
+ String runnerString = ValueProviders.updateSerializedOptions(
+ serializedOptions, ImmutableMap.of("foo", "runtime foo"));
+ CreateOfProviderOptions runtimeOptions =
+ MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class);
+
+ p.run(runtimeOptions);
+ }
+
+
@Test
public void testCreateGetName() {
assertEquals("Create.Values", Create.of(1, 2, 3).getName());
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 1ed6430..7e40db4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -99,7 +99,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -611,10 +610,10 @@ public class DatastoreV1 {
if (getQuery() != null) {
inputQuery = input.apply(Create.of(getQuery()));
} else {
- inputQuery = input
- .apply(Create.of(getLiteralGqlQuery())
- .withCoder(SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {})))
- .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+ inputQuery =
+ input
+ .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
+ .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
}
PCollection<KV<Integer, Query>> splitQueries = inputQuery
@@ -730,7 +729,7 @@ public class DatastoreV1 {
/**
* A DoFn that translates a Cloud Datastore gql query string to {@code Query}.
*/
- static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, Query> {
+ static class GqlQueryTranslateFn extends DoFn<String, Query> {
private final V1Options v1Options;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
@@ -751,9 +750,9 @@ public class DatastoreV1 {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- ValueProvider<String> gqlQuery = c.element();
- LOG.info("User query: '{}'", gqlQuery.get());
- Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), datastore,
+ String gqlQuery = c.element();
+ LOG.info("User query: '{}'", gqlQuery);
+ Query query = translateGqlQueryWithLimitCheck(gqlQuery, datastore,
v1Options.getNamespace());
LOG.info("User gql query translated to Query({})", query);
c.output(query);
http://git-wip-us.apache.org/repos/asf/beam/blob/f515c22d/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index bf73dbe..51f34ae 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -31,7 +31,9 @@ import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -272,7 +274,7 @@ public class JdbcIO {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
- @Nullable abstract String getQuery();
+ @Nullable abstract ValueProvider<String> getQuery();
@Nullable abstract StatementPreparator getStatementPreparator();
@Nullable abstract RowMapper<T> getRowMapper();
@Nullable abstract Coder<T> getCoder();
@@ -282,7 +284,7 @@ public class JdbcIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
- abstract Builder<T> setQuery(String query);
+ abstract Builder<T> setQuery(ValueProvider<String> query);
abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -297,6 +299,11 @@ public class JdbcIO {
public Read<T> withQuery(String query) {
checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+ return withQuery(ValueProvider.StaticValueProvider.of(query));
+ }
+
+ public Read<T> withQuery(ValueProvider<String> query) {
+ checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
return toBuilder().setQuery(query).build();
}
@@ -321,7 +328,7 @@ public class JdbcIO {
@Override
public PCollection<T> expand(PBegin input) {
return input
- .apply(Create.of(getQuery()))
+ .apply(Create.ofProvider(getQuery(), StringUtf8Coder.of()))
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
.apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
private Random random;