You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:56 UTC
[4/9] beam git commit: Converts PubsubIO.Read to AutoValue
Converts PubsubIO.Read to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4d04606
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4d04606
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4d04606
Branch: refs/heads/master
Commit: f4d04606c105ca45a7754516781cb72b4c818baf
Parents: f5e3f52
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:14:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 244 +++++++------------
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 8 +-
2 files changed, 95 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index f0926d4..3c76942 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -455,64 +456,61 @@ public class PubsubIO {
}
}
+ /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
public static <T> Read<T> read() {
- return new Read<>();
+ return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
public static <T> Write<T> write() {
return new Write<>();
}
- /**
- * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
- * returns a {@link PCollection} of {@link String Strings} containing the items from
- * the stream.
- */
- public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
- /** The Cloud Pub/Sub topic to read from. */
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable
- private final ValueProvider<PubsubTopic> topic;
+ abstract ValueProvider<PubsubTopic> getTopicProvider();
- /** The Cloud Pub/Sub subscription to read from. */
@Nullable
- private final ValueProvider<PubsubSubscription> subscription;
+ abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();
/** The name of the message attribute to read timestamps from. */
@Nullable
- private final String timestampLabel;
+ abstract String getTimestampLabel();
/** The name of the message attribute to read unique message IDs from. */
@Nullable
- private final String idLabel;
+ abstract String getIdLabel();
/** The coder used to decode each record. */
@Nullable
- private final Coder<T> coder;
+ abstract Coder<T> getCoder();
/** User function for parsing PubsubMessage object. */
- SimpleFunction<PubsubMessage, T> parseFn;
+ @Nullable
+ abstract SimpleFunction<PubsubMessage, T> getParseFn();
- private Read() {
- this(null, null, null, null, null, null, null);
- }
+ abstract Builder<T> toBuilder();
- private Read(String name, ValueProvider<PubsubSubscription> subscription,
- ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
- String idLabel,
- SimpleFunction<PubsubMessage, T> parseFn) {
- super(name);
- this.subscription = subscription;
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.coder = coder;
- this.idLabel = idLabel;
- this.parseFn = parseFn;
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
+
+ abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
+
+ abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+ abstract Builder<T> setIdLabel(String idLabel);
+
+ abstract Builder<T> setCoder(Coder<T> coder);
+
+ abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
+
+ abstract Read<T> build();
}
/**
- * Returns a transform that's like this one but reading from the
- * given subscription.
+ * Reads from the given subscription.
*
* <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
* of the {@code subscription} string.
@@ -520,8 +518,6 @@ public class PubsubIO {
* <p>Multiple readers reading from the same subscription will each receive
* some arbitrary portion of the data. Most likely, separate readers should
* use their own subscriptions.
- *
- * <p>Does not modify this object.
*/
public Read<T> subscription(String subscription) {
return subscription(StaticValueProvider.of(subscription));
@@ -535,9 +531,12 @@ public class PubsubIO {
// Validate.
PubsubSubscription.fromPath(subscription.get());
}
- return new Read<>(
- name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
- null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+ return toBuilder()
+ .setSubscriptionProvider(
+ NestedValueProvider.of(subscription, new SubscriptionTranslator()))
+ /* reset topic to null */
+ .setTopicProvider(null)
+ .build();
}
/**
@@ -563,15 +562,16 @@ public class PubsubIO {
// Validate.
PubsubTopic.fromPath(topic.get());
}
- return new Read<>(name, null /* reset subscription to null */,
- NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, coder, idLabel, parseFn);
+ return toBuilder()
+ .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+ /* reset subscription to null */
+ .setSubscriptionProvider(null)
+ .build();
}
/**
- * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
- * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
- * parameter specifies the name of the attribute that contains the timestamp.
+ * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
+ * attributes, specifies the name of the attribute that contains the timestamp.
*
* <p>The timestamp value is expected to be represented in the attribute as either:
*
@@ -599,88 +599,90 @@ public class PubsubIO {
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
public Read<T> timestampLabel(String timestampLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setTimestampLabel(timestampLabel).build();
}
/**
- * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
- * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
- * parameter specifies the attribute name. The value of the attribute can be any string
- * that uniquely identifies this record.
+ * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
+ * message attributes, specifies the name of the attribute containing the unique identifier.
+ * The value of the attribute can be any string that uniquely identifies this record.
*
* <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
* If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
* be delivered, and deduplication of the stream will be strictly best effort.
*/
public Read<T> idLabel(String idLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setIdLabel(idLabel).build();
}
/**
- * Returns a transform that's like this one but that uses the given
- * {@link Coder} to decode each record into a value of type {@code T}.
- *
- * <p>Does not modify this object.
+ * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
*/
public Read<T> withCoder(Coder<T> coder) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setCoder(coder).build();
}
/**
- * Causes the source to return a PubsubMessage that includes Pubsub attributes.
- * The user must supply a parsing function to transform the PubsubMessage into an output type.
+ * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
+ * given parsing function to transform the PubsubMessage into an output type.
* A Coder for the output type T must be registered or set on the output via
* {@link PCollection#setCoder(Coder)}.
*/
public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
- return new Read<T>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setParseFn(parseFn).build();
}
@Override
public PCollection<T> expand(PBegin input) {
- if (topic == null && subscription == null) {
- throw new IllegalStateException("Need to set either the topic or the subscription for "
- + "a PubsubIO.Read transform");
+ if (getTopicProvider() == null && getSubscriptionProvider() == null) {
+ throw new IllegalStateException(
+ "Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
}
- if (topic != null && subscription != null) {
- throw new IllegalStateException("Can't set both the topic and the subscription for "
- + "a PubsubIO.Read transform");
+ if (getTopicProvider() != null && getSubscriptionProvider() != null) {
+ throw new IllegalStateException(
+ "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
}
- if (coder == null) {
- throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
- + "the withCoder method.");
+ if (getCoder() == null) {
+ throw new IllegalStateException(
+ "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
}
- @Nullable ValueProvider<ProjectPath> projectPath =
- topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
- @Nullable ValueProvider<TopicPath> topicPath =
- topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
- @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
- subscription == null
+ @Nullable
+ ValueProvider<ProjectPath> projectPath =
+ getTopicProvider() == null
? null
- : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
- PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
- FACTORY, projectPath, topicPath, subscriptionPath,
- coder, timestampLabel, idLabel, parseFn);
+ : NestedValueProvider.of(getTopicProvider(), new ProjectPathTranslator());
+ @Nullable
+ ValueProvider<TopicPath> topicPath =
+ getTopicProvider() == null
+ ? null
+ : NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
+ @Nullable
+ ValueProvider<SubscriptionPath> subscriptionPath =
+ getSubscriptionProvider() == null
+ ? null
+ : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
+ PubsubUnboundedSource<T> source =
+ new PubsubUnboundedSource<T>(
+ FACTORY,
+ projectPath,
+ topicPath,
+ subscriptionPath,
+ getCoder(),
+ getTimestampLabel(),
+ getIdLabel(),
+ getParseFn());
return input.getPipeline().apply(source);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
- if (subscription != null) {
- String subscriptionString = subscription.isAccessible()
- ? subscription.get().asPath() : subscription.toString();
+ if (getSubscriptionProvider() != null) {
+ String subscriptionString = getSubscriptionProvider().isAccessible()
+ ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
builder.add(DisplayData.item("subscription", subscriptionString)
.withLabel("Pubsub Subscription"));
}
@@ -688,72 +690,8 @@ public class PubsubIO {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return coder;
+ return getCoder();
}
-
- /**
- * Get the topic being read from.
- */
- @Nullable
- public PubsubTopic getTopic() {
- return topic == null ? null : topic.get();
- }
-
- /**
- * Get the {@link ValueProvider} for the topic being read from.
- */
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
-
- /**
- * Get the subscription being read from.
- */
- @Nullable
- public PubsubSubscription getSubscription() {
- return subscription == null ? null : subscription.get();
- }
-
- /**
- * Get the {@link ValueProvider} for the subscription being read from.
- */
- public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
- return subscription;
- }
-
- /**
- * Get the timestamp label.
- */
- @Nullable
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- /**
- * Get the id label.
- */
- @Nullable
- public String getIdLabel() {
- return idLabel;
- }
-
-
- /**
- * Get the {@link Coder} used for the transform's output.
- */
- @Nullable
- public Coder<T> getCoder() {
- return coder;
- }
-
- /**
- * Get the parse function used for PubSub attributes.
- */
- @Nullable
- public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
- return parseFn;
- }
-
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 6e9922c..7fe6e26 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -126,8 +126,8 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
PubsubIO.Read<String> read = PubsubIO.<String>read()
.subscription(StaticValueProvider.of(subscription));
- assertNull(read.getTopic());
- assertNotNull(read.getSubscription());
+ assertNull(read.getTopicProvider());
+ assertNotNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
}
@@ -136,8 +136,8 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
PubsubIO.Read<String> read = PubsubIO.<String>read()
.topic(StaticValueProvider.of(topic));
- assertNotNull(read.getTopic());
- assertNull(read.getSubscription());
+ assertNotNull(read.getTopicProvider());
+ assertNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
}