You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:57 UTC
[5/9] beam git commit: Converts PubsubIO.Write to AutoValue
Converts PubsubIO.Write to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df6ef969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df6ef969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df6ef969
Branch: refs/heads/master
Commit: df6ef969d6df5c42d091cc00997b0ed7680315fb
Parents: 9e81548
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:34:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 166 +++++++------------
1 file changed, 61 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/df6ef969/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 69a5bd6..5702af1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -461,8 +461,9 @@ public class PubsubIO {
return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
+ /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
public static <T> Write<T> write() {
- return new Write<>();
+ return new AutoValue_PubsubIO_Write.Builder<T>().build();
}
/** Implementation of {@link #read}. */
@@ -696,43 +697,47 @@ public class PubsubIO {
private PubsubIO() {}
- /**
- * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
- * to a Cloud Pub/Sub stream.
- */
- public static class Write<T> extends PTransform<PCollection<T>, PDone> {
-
- /** The Cloud Pub/Sub topic to publish to. */
+ /** Implementation of {@link #write}. */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
- private final ValueProvider<PubsubTopic> topic;
+ abstract ValueProvider<PubsubTopic> getTopicProvider();
+
/** The name of the message attribute to publish message timestamps in. */
@Nullable
- private final String timestampLabel;
+ abstract String getTimestampLabel();
+
/** The name of the message attribute to publish unique message IDs in. */
@Nullable
- private final String idLabel;
+ abstract String getIdLabel();
+
/** The input type Coder. */
- private final Coder<T> coder;
+ @Nullable
+ abstract Coder<T> getCoder();
+
/** The format function for input PubsubMessage objects. */
- SimpleFunction<T, PubsubMessage> formatFn;
+ @Nullable
+ abstract SimpleFunction<T, PubsubMessage> getFormatFn();
- private Write() {
- this(null, null, null, null, null, null);
- }
+ abstract Builder<T> toBuilder();
- private Write(
- String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
- String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
- super(name);
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.coder = coder;
- this.formatFn = formatFn;
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
+
+ abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+ abstract Builder<T> setIdLabel(String idLabel);
+
+ abstract Builder<T> setCoder(Coder<T> coder);
+
+ abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);
+
+ abstract Write<T> build();
}
/**
- * Creates a transform that publishes to the specified topic.
+ * Publishes to the specified topic.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* {@code topic} string.
@@ -745,14 +750,15 @@ public class PubsubIO {
* Like {@code topic()} but with a {@link ValueProvider}.
*/
public Write<T> topic(ValueProvider<String> topic) {
- return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, idLabel, coder, formatFn);
+ return toBuilder()
+ .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+ .build();
}
/**
- * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
- * messages in an attribute with the specified name. The value of the attribute will be a number
- * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+ * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
+ * with the specified name. The value of the attribute will be a number representing the number
+ * of milliseconds since the Unix epoch. For example, if using the Joda
* time classes, {@link Instant#Instant(long)} can be used to parse this value.
*
* <p>If the output from this sink is being read by another Beam pipeline, then
@@ -760,32 +766,27 @@ public class PubsubIO {
* these timestamps from the appropriate attribute.
*/
public Write<T> timestampLabel(String timestampLabel) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setTimestampLabel(timestampLabel).build();
}
/**
- * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
- * published messages in an attribute with the specified name. The value of the attribute is an
- * opaque string.
+ * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
+ * attribute with the specified name. The value of the attribute is an opaque string.
*
* <p>If the the output from this sink is being read by another Beam pipeline, then
* {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
public Write<T> idLabel(String idLabel) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setIdLabel(idLabel).build();
}
/**
- * Returns a new transform that's like this one
- * but that uses the given {@link Coder} to encode each of
- * the elements of the input {@link PCollection} into an
- * output record.
- *
- * <p>Does not modify this object.
+ * Uses the given {@link Coder} to encode each of the elements of the input {@link PCollection}
+ * into an output record.
*/
public Write<T> withCoder(Coder<T> coder) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setCoder(coder).build();
}
/**
@@ -794,12 +795,12 @@ public class PubsubIO {
* to separately set the PubSub message's payload and attributes.
*/
public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
- return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setFormatFn(formatFn).build();
}
@Override
public PDone expand(PCollection<T> input) {
- if (topic == null) {
+ if (getTopicProvider() == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
switch (input.isBounded()) {
@@ -809,11 +810,11 @@ public class PubsubIO {
case UNBOUNDED:
return input.apply(new PubsubUnboundedSink<T>(
FACTORY,
- NestedValueProvider.of(topic, new TopicPathTranslator()),
- coder,
- timestampLabel,
- idLabel,
- formatFn,
+ NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
+ getCoder(),
+ getTimestampLabel(),
+ getIdLabel(),
+ getFormatFn(),
100 /* numShards */));
}
throw new RuntimeException(); // cases are exhaustive.
@@ -822,7 +823,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
}
@Override
@@ -831,54 +832,6 @@ public class PubsubIO {
}
/**
- * Returns the PubSub topic being written to.
- */
- @Nullable
- public PubsubTopic getTopic() {
- return (topic == null) ? null : topic.get();
- }
-
- /**
- * Returns the {@link ValueProvider} for the topic being written to.
- */
- @Nullable
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
-
- /**
- * Returns the timestamp label.
- */
- @Nullable
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- /**
- * Returns the id label.
- */
- @Nullable
- public String getIdLabel() {
- return idLabel;
- }
-
- /**
- * Returns the output coder.
- */
- @Nullable
- public Coder<T> getCoder() {
- return coder;
- }
-
- /**
- * Returns the formatting function used if publishing attributes.
- */
- @Nullable
- public SimpleFunction<T, PubsubMessage> getFormatFn() {
- return formatFn;
- }
-
- /**
* Writer to Pubsub which batches messages from bounded collections.
*
* <p>Public so can be suppressed by runners.
@@ -894,7 +847,7 @@ public class PubsubIO {
this.output = new ArrayList<>();
// NOTE: idLabel is ignored.
this.pubsubClient =
- FACTORY.newClient(timestampLabel, null,
+ FACTORY.newClient(getTimestampLabel(), null,
c.getPipelineOptions().as(PubsubOptions.class));
}
@@ -902,8 +855,8 @@ public class PubsubIO {
public void processElement(ProcessContext c) throws IOException {
byte[] payload = null;
Map<String, String> attributes = null;
- if (formatFn != null) {
- PubsubMessage message = formatFn.apply(c.element());
+ if (getFormatFn() != null) {
+ PubsubMessage message = getFormatFn().apply(c.element());
payload = message.getMessage();
attributes = message.getAttributeMap();
} else {
@@ -930,9 +883,12 @@ public class PubsubIO {
}
private void publish() throws IOException {
- int n = pubsubClient.publish(
- PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
- output);
+ PubsubTopic topic = getTopicProvider().get();
+ int n =
+ pubsubClient.publish(
+ PubsubClient.topicPathFromName(
+ topic.project, topic.topic),
+ output);
checkState(n == output.size());
output.clear();
}