You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/20 23:00:09 UTC
beam git commit: Automated refactoring of PubsubIO to fix indentation
Repository: beam
Updated Branches:
refs/heads/master c6e46b655 -> f799a57af
Automated refactoring of PubsubIO to fix indentation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f799a57a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f799a57a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f799a57a
Branch: refs/heads/master
Commit: f799a57af14722e1da26baee25bda03bf6a52b6e
Parents: c6e46b6
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jan 20 15:00:05 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jan 20 15:00:05 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 803 ++++++++++---------
1 file changed, 412 insertions(+), 391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f799a57a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 2802871..1471953 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
* {@link PipelineRunner PipelineRunners} for more details.
*/
public class PubsubIO {
+
private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
/** Factory for creating pubsub client to manage transport. */
@@ -157,6 +158,7 @@ public class PubsubIO {
* a map of attached attributes.
*/
public static class PubsubMessage {
+
private byte[] message;
private Map<String, String> attributes;
@@ -193,7 +195,8 @@ public class PubsubIO {
* Class representing a Cloud Pub/Sub Subscription.
*/
public static class PubsubSubscription implements Serializable {
- private enum Type { NORMAL, FAKE }
+
+ private enum Type {NORMAL, FAKE}
private final Type type;
private final String project;
@@ -299,6 +302,7 @@ public class PubsubIO {
*/
private static class SubscriptionTranslator
implements SerializableFunction<String, PubsubSubscription> {
+
@Override
public PubsubSubscription apply(String from) {
return PubsubSubscription.fromPath(from);
@@ -310,6 +314,7 @@ public class PubsubIO {
*/
private static class SubscriptionPathTranslator
implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
+
@Override
public SubscriptionPath apply(PubsubSubscription from) {
return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
@@ -321,6 +326,7 @@ public class PubsubIO {
*/
private static class TopicTranslator
implements SerializableFunction<String, PubsubTopic> {
+
@Override
public PubsubTopic apply(String from) {
return PubsubTopic.fromPath(from);
@@ -332,6 +338,7 @@ public class PubsubIO {
*/
private static class TopicPathTranslator
implements SerializableFunction<PubsubTopic, TopicPath> {
+
@Override
public TopicPath apply(PubsubTopic from) {
return PubsubClient.topicPathFromName(from.project, from.topic);
@@ -343,6 +350,7 @@ public class PubsubIO {
*/
private static class ProjectPathTranslator
implements SerializableFunction<PubsubTopic, ProjectPath> {
+
@Override
public ProjectPath apply(PubsubTopic from) {
return PubsubClient.projectPathFromId(from.project);
@@ -353,7 +361,8 @@ public class PubsubIO {
* Class representing a Cloud Pub/Sub Topic.
*/
public static class PubsubTopic implements Serializable {
- private enum Type { NORMAL, FAKE }
+
+ private enum Type {NORMAL, FAKE}
private final Type type;
private final String project;
@@ -471,451 +480,458 @@ public class PubsubIO {
* can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or
* {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
*/
- public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
- /** The Cloud Pub/Sub topic to read from. */
- @Nullable private final ValueProvider<PubsubTopic> topic;
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
- /** The Cloud Pub/Sub subscription to read from. */
- @Nullable private final ValueProvider<PubsubSubscription> subscription;
+ /** The Cloud Pub/Sub topic to read from. */
+ @Nullable
+ private final ValueProvider<PubsubTopic> topic;
- /** The name of the message attribute to read timestamps from. */
- @Nullable private final String timestampLabel;
+ /** The Cloud Pub/Sub subscription to read from. */
+ @Nullable
+ private final ValueProvider<PubsubSubscription> subscription;
- /** The name of the message attribute to read unique message IDs from. */
- @Nullable private final String idLabel;
+ /** The name of the message attribute to read timestamps from. */
+ @Nullable
+ private final String timestampLabel;
- /** The coder used to decode each record. */
- @Nullable private final Coder<T> coder;
+ /** The name of the message attribute to read unique message IDs from. */
+ @Nullable
+ private final String idLabel;
- /** Stop after reading this many records. */
- private final int maxNumRecords;
+ /** The coder used to decode each record. */
+ @Nullable
+ private final Coder<T> coder;
- /** Stop after reading for this much time. */
- @Nullable private final Duration maxReadTime;
+ /** Stop after reading this many records. */
+ private final int maxNumRecords;
- /** User function for parsing PubsubMessage object. */
- SimpleFunction<PubsubMessage, T> parseFn;
+ /** Stop after reading for this much time. */
+ @Nullable
+ private final Duration maxReadTime;
- private Read() {
- this(null, null, null, null, null, null, 0, null, null);
- }
+ /** User function for parsing PubsubMessage object. */
+ SimpleFunction<PubsubMessage, T> parseFn;
- private Read(String name, ValueProvider<PubsubSubscription> subscription,
- ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
- String idLabel, int maxNumRecords, Duration maxReadTime,
- SimpleFunction<PubsubMessage, T> parseFn) {
- super(name);
- this.subscription = subscription;
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.coder = coder;
- this.idLabel = idLabel;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
- this.parseFn = parseFn;
- }
+ private Read() {
+ this(null, null, null, null, null, null, 0, null, null);
+ }
- /**
- * Returns a transform that's like this one but reading from the
- * given subscription.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
- * of the {@code subscription} string.
- *
- * <p>Multiple readers reading from the same subscription will each receive
- * some arbitrary portion of the data. Most likely, separate readers should
- * use their own subscriptions.
- *
- * <p>Does not modify this object.
- */
- public Read<T> subscription(String subscription) {
- return subscription(StaticValueProvider.of(subscription));
- }
+ private Read(String name, ValueProvider<PubsubSubscription> subscription,
+ ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
+ String idLabel, int maxNumRecords, Duration maxReadTime,
+ SimpleFunction<PubsubMessage, T> parseFn) {
+ super(name);
+ this.subscription = subscription;
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.coder = coder;
+ this.idLabel = idLabel;
+ this.maxNumRecords = maxNumRecords;
+ this.maxReadTime = maxReadTime;
+ this.parseFn = parseFn;
+ }
- /**
- * Like {@code subscription()} but with a {@link ValueProvider}.
- */
- public Read<T> subscription(ValueProvider<String> subscription) {
- if (subscription.isAccessible()) {
- // Validate.
- PubsubSubscription.fromPath(subscription.get());
- }
- return new Read<>(name,
- NestedValueProvider.of(subscription, new SubscriptionTranslator()),
- topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
- }
+ /**
+ * Returns a transform that's like this one but reading from the
+ * given subscription.
+ *
+ * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
+ * of the {@code subscription} string.
+ *
+ * <p>Multiple readers reading from the same subscription will each receive
+ * some arbitrary portion of the data. Most likely, separate readers should
+ * use their own subscriptions.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read<T> subscription(String subscription) {
+ return subscription(StaticValueProvider.of(subscription));
+ }
- /**
- * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
- * with {@link #subscription(String)}.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
- * of the {@code topic} string.
- *
- * <p>The Beam runner will start reading data published on this topic from the time the
- * pipeline is started. Any data published on the topic before the pipeline is started will
- * not be read by the runner.
- */
- public Read<T> topic(String topic) {
- return topic(StaticValueProvider.of(topic));
+ /**
+ * Like {@code subscription()} but with a {@link ValueProvider}.
+ */
+ public Read<T> subscription(ValueProvider<String> subscription) {
+ if (subscription.isAccessible()) {
+ // Validate.
+ PubsubSubscription.fromPath(subscription.get());
}
+ return new Read<>(name,
+ NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+ topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+ }
- /**
- * Like {@code topic()} but with a {@link ValueProvider}.
- */
- public Read<T> topic(ValueProvider<String> topic) {
- if (topic.isAccessible()) {
- // Validate.
- PubsubTopic.fromPath(topic.get());
- }
- return new Read<>(name, subscription,
- NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
- }
+ /**
+ * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+ * with {@link #subscription(String)}.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+ * of the {@code topic} string.
+ *
+ * <p>The Beam runner will start reading data published on this topic from the time the
+ * pipeline is started. Any data published on the topic before the pipeline is started will
+ * not be read by the runner.
+ */
+ public Read<T> topic(String topic) {
+ return topic(StaticValueProvider.of(topic));
+ }
- /**
- * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
- * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
- * parameter specifies the name of the attribute that contains the timestamp.
- *
- * <p>The timestamp value is expected to be represented in the attribute as either:
- *
- * <ul>
- * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
- * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
- * value for this attribute.
- * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
- * sub-second component of the timestamp is optional, and digits beyond the first three
- * (i.e., time units smaller than milliseconds) will be ignored.
- * </ul>
- *
- * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
- * the first time it sees each record. All windowing will be done relative to these
- * timestamps.
- *
- * <p>By default, windows are emitted based on an estimate of when this source is likely
- * done producing data for a given timestamp (referred to as the Watermark; see
- * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
- * specified with the windowing strategy – by default it will be output immediately.
- *
- * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
- * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
- *
- * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
- */
- public Read<T> timestampLabel(String timestampLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
- parseFn);
+ /**
+ * Like {@code topic()} but with a {@link ValueProvider}.
+ */
+ public Read<T> topic(ValueProvider<String> topic) {
+ if (topic.isAccessible()) {
+ // Validate.
+ PubsubTopic.fromPath(topic.get());
}
+ return new Read<>(name, subscription,
+ NestedValueProvider.of(topic, new TopicTranslator()),
+ timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+ }
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
- * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
- * parameter specifies the attribute name. The value of the attribute can be any string
- * that uniquely identifies this record.
- *
- * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
- * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
- * be delivered, and deduplication of the stream will be strictly best effort.
- */
- public Read<T> idLabel(String idLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
- parseFn);
- }
+ /**
+ * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+ * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+ * parameter specifies the name of the attribute that contains the timestamp.
+ *
+ * <p>The timestamp value is expected to be represented in the attribute as either:
+ *
+ * <ul>
+ * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+ * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+ * value for this attribute.
+ * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+ * sub-second component of the timestamp is optional, and digits beyond the first three
+ * (i.e., time units smaller than milliseconds) will be ignored.
+ * </ul>
+ *
+ * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+ * the first time it sees each record. All windowing will be done relative to these
+ * timestamps.
+ *
+ * <p>By default, windows are emitted based on an estimate of when this source is likely
+ * done producing data for a given timestamp (referred to as the Watermark; see
+ * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+ * specified with the windowing strategy – by default it will be output immediately.
+ *
+ * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+ * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+ *
+ * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
+ */
+ public Read<T> timestampLabel(String timestampLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
- /**
- * Returns a transform that's like this one but that uses the given
- * {@link Coder} to decode each record into a value of type {@code T}.
- *
- * <p>Does not modify this object.
- */
- public Read<T> withCoder(Coder<T> coder) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
- parseFn);
- }
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+ * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+ * parameter specifies the attribute name. The value of the attribute can be any string
+ * that uniquely identifies this record.
+ *
+ * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+ * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+ * be delivered, and deduplication of the stream will be strictly best effort.
+ */
+ public Read<T> idLabel(String idLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
- /**
- * Causes the source to return a PubsubMessage that includes Pubsub attributes.
- * The user must supply a parsing function to transform the PubsubMessage into an output type.
- * A Coder for the output type T must be registered or set on the output via
- * {@link PCollection#setCoder(Coder)}.
- */
- public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
- return new Read<T>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- maxNumRecords, maxReadTime, parseFn);
- }
+ /**
+ * Returns a transform that's like this one but that uses the given
+ * {@link Coder} to decode each record into a value of type {@code T}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read<T> withCoder(Coder<T> coder) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
- *
- * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
- * bounded source.
- */
- public Read<T> maxNumRecords(int maxNumRecords) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
- parseFn);
- }
+ /**
+ * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+ * The user must supply a parsing function to transform the PubsubMessage into an output type.
+ * A Coder for the output type T must be registered or set on the output via
+ * {@link PCollection#setCoder(Coder)}.
+ */
+ public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+ return new Read<T>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ maxNumRecords, maxReadTime, parseFn);
+ }
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * duration during which records will be read. The transform produces a <i>bounded</i>
- * {@link PCollection}.
- *
- * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
- * bounded source.
- */
- public Read<T> maxReadTime(Duration maxReadTime) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
- parseFn);
- }
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+ * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
+ *
+ * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
+ * bounded source.
+ */
+ public Read<T> maxNumRecords(int maxNumRecords) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
- @Override
- public PCollection<T> expand(PBegin input) {
- if (topic == null && subscription == null) {
- throw new IllegalStateException("Need to set either the topic or the subscription for "
- + "a PubsubIO.Read transform");
- }
- if (topic != null && subscription != null) {
- throw new IllegalStateException("Can't set both the topic and the subscription for "
- + "a PubsubIO.Read transform");
- }
- if (coder == null) {
- throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
- + "the withCoder method.");
- }
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+ * duration during which records will be read. The transform produces a <i>bounded</i>
+ * {@link PCollection}.
+ *
+ * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
+ * bounded source.
+ */
+ public Read<T> maxReadTime(Duration maxReadTime) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
- boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (topic == null && subscription == null) {
+ throw new IllegalStateException("Need to set either the topic or the subscription for "
+ + "a PubsubIO.Read transform");
+ }
+ if (topic != null && subscription != null) {
+ throw new IllegalStateException("Can't set both the topic and the subscription for "
+ + "a PubsubIO.Read transform");
+ }
+ if (coder == null) {
+ throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+ + "the withCoder method.");
+ }
- if (boundedOutput) {
- return input.getPipeline().begin()
- .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.of(new PubsubBoundedReader()))
- .setCoder(coder);
- } else {
- @Nullable ValueProvider<ProjectPath> projectPath =
- topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
- @Nullable ValueProvider<TopicPath> topicPath =
- topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
- @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
- subscription == null
- ? null
- : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
- return input.getPipeline().begin()
- .apply(new PubsubUnboundedSource<T>(
- FACTORY, projectPath, topicPath, subscriptionPath,
- coder, timestampLabel, idLabel, parseFn));
- }
+ boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
+
+ if (boundedOutput) {
+ return input.getPipeline().begin()
+ .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+ .apply(ParDo.of(new PubsubBoundedReader()))
+ .setCoder(coder);
+ } else {
+ @Nullable ValueProvider<ProjectPath> projectPath =
+ topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+ @Nullable ValueProvider<TopicPath> topicPath =
+ topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+ @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+ subscription == null
+ ? null
+ : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
+ return input.getPipeline().begin()
+ .apply(new PubsubUnboundedSource<T>(
+ FACTORY, projectPath, topicPath, subscriptionPath,
+ coder, timestampLabel, idLabel, parseFn));
}
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
- builder
- .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
+ builder
+ .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
- .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
+ .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
.withLabel("Maximum Read Records"), 0);
- if (subscription != null) {
- String subscriptionString = subscription.isAccessible()
- ? subscription.get().asPath() : subscription.toString();
- builder.add(DisplayData.item("subscription", subscriptionString)
- .withLabel("Pubsub Subscription"));
- }
+ if (subscription != null) {
+ String subscriptionString = subscription.isAccessible()
+ ? subscription.get().asPath() : subscription.toString();
+ builder.add(DisplayData.item("subscription", subscriptionString)
+ .withLabel("Pubsub Subscription"));
}
+ }
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return coder;
- }
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
- /**
- * Get the topic being read from.
- */
- public PubsubTopic getTopic() {
- return topic == null ? null : topic.get();
- }
+ /**
+ * Get the topic being read from.
+ */
+ public PubsubTopic getTopic() {
+ return topic == null ? null : topic.get();
+ }
- /**
- * Get the {@link ValueProvider} for the topic being read from.
- */
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
+ /**
+ * Get the {@link ValueProvider} for the topic being read from.
+ */
+ public ValueProvider<PubsubTopic> getTopicProvider() {
+ return topic;
+ }
- /**
- * Get the subscription being read from.
- */
- public PubsubSubscription getSubscription() {
- return subscription == null ? null : subscription.get();
- }
+ /**
+ * Get the subscription being read from.
+ */
+ public PubsubSubscription getSubscription() {
+ return subscription == null ? null : subscription.get();
+ }
- /**
- * Get the {@link ValueProvider} for the subscription being read from.
- */
- public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
- return subscription;
- }
+ /**
+ * Get the {@link ValueProvider} for the subscription being read from.
+ */
+ public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
+ return subscription;
+ }
- /**
- * Get the timestamp label.
- */
- public String getTimestampLabel() {
- return timestampLabel;
- }
+ /**
+ * Get the timestamp label.
+ */
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
- /**
- * Get the id label.
- */
- public String getIdLabel() {
- return idLabel;
- }
+ /**
+ * Get the id label.
+ */
+ public String getIdLabel() {
+ return idLabel;
+ }
- /**
- * Get the {@link Coder} used for the transform's output.
- */
- public Coder<T> getCoder() {
+ /**
+ * Get the {@link Coder} used for the transform's output.
+ */
+ public Coder<T> getCoder() {
return coder;
}
- /**
- * Get the maximum number of records to read.
- */
- public int getMaxNumRecords() {
- return maxNumRecords;
- }
+ /**
+ * Get the maximum number of records to read.
+ */
+ public int getMaxNumRecords() {
+ return maxNumRecords;
+ }
- /**
- * Get the maximum read time.
- */
- public Duration getMaxReadTime() {
- return maxReadTime;
- }
+ /**
+ * Get the maximum read time.
+ */
+ public Duration getMaxReadTime() {
+ return maxReadTime;
+ }
- /**
- * Get the parse function used for PubSub attributes.
- */
- public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
- return parseFn;
- }
+ /**
+ * Get the parse function used for PubSub attributes.
+ */
+ public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+ return parseFn;
+ }
- /**
- * Default reader when Pubsub subscription has some form of upper bound.
- *
- * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
- * of PubsubUnboundedSource.
- *
- * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
- * service in streaming mode.
- *
- * <p>Public so can be suppressed by runners.
- */
- public class PubsubBoundedReader extends DoFn<Void, T> {
- private static final int DEFAULT_PULL_SIZE = 100;
- private static final int ACK_TIMEOUT_SEC = 60;
-
- @ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- try (PubsubClient pubsubClient =
- FACTORY.newClient(timestampLabel, idLabel,
- c.getPipelineOptions().as(PubsubOptions.class))) {
-
- PubsubClient.SubscriptionPath subscriptionPath;
- if (getSubscription() == null) {
- TopicPath topicPath =
- PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
- // The subscription will be registered under this pipeline's project if we know it.
- // Otherwise we'll fall back to the topic's project.
- // Note that they don't need to be the same.
- String projectId =
- c.getPipelineOptions().as(PubsubOptions.class).getProject();
- if (Strings.isNullOrEmpty(projectId)) {
- projectId = getTopic().project;
- }
- ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
- try {
- subscriptionPath =
- pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create subscription: ", e);
- }
- } else {
+ /**
+ * Default reader when Pubsub subscription has some form of upper bound.
+ *
+ * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
+ * of PubsubUnboundedSource.
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+ * service in streaming mode.
+ *
+ * <p>Public so can be suppressed by runners.
+ */
+ public class PubsubBoundedReader extends DoFn<Void, T> {
+
+ private static final int DEFAULT_PULL_SIZE = 100;
+ private static final int ACK_TIMEOUT_SEC = 60;
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ try (PubsubClient pubsubClient =
+ FACTORY.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(PubsubOptions.class))) {
+
+ PubsubClient.SubscriptionPath subscriptionPath;
+ if (getSubscription() == null) {
+ TopicPath topicPath =
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+ // The subscription will be registered under this pipeline's project if we know it.
+ // Otherwise we'll fall back to the topic's project.
+ // Note that they don't need to be the same.
+ String projectId =
+ c.getPipelineOptions().as(PubsubOptions.class).getProject();
+ if (Strings.isNullOrEmpty(projectId)) {
+ projectId = getTopic().project;
+ }
+ ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
+ try {
subscriptionPath =
- PubsubClient.subscriptionPathFromName(getSubscription().project,
- getSubscription().subscription);
+ pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create subscription: ", e);
}
+ } else {
+ subscriptionPath =
+ PubsubClient.subscriptionPathFromName(getSubscription().project,
+ getSubscription().subscription);
+ }
- Instant endTime = (getMaxReadTime() == null)
- ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
+ Instant endTime = (getMaxReadTime() == null)
+ ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
- List<IncomingMessage> messages = new ArrayList<>();
+ List<IncomingMessage> messages = new ArrayList<>();
- Throwable finallyBlockException = null;
- try {
- while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
- && Instant.now().isBefore(endTime)) {
- int batchSize = DEFAULT_PULL_SIZE;
- if (getMaxNumRecords() > 0) {
- batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
- }
-
- List<IncomingMessage> batchMessages =
- pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
- false);
- List<String> ackIds = new ArrayList<>();
- for (IncomingMessage message : batchMessages) {
- messages.add(message);
- ackIds.add(message.ackId);
- }
- if (ackIds.size() != 0) {
- pubsubClient.acknowledge(subscriptionPath, ackIds);
- }
+ Throwable finallyBlockException = null;
+ try {
+ while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
+ && Instant.now().isBefore(endTime)) {
+ int batchSize = DEFAULT_PULL_SIZE;
+ if (getMaxNumRecords() > 0) {
+ batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
}
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
- } finally {
- if (getSubscription() == null) {
- try {
- pubsubClient.deleteSubscription(subscriptionPath);
- } catch (Exception e) {
- finallyBlockException = e;
- }
+
+ List<IncomingMessage> batchMessages =
+ pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
+ false);
+ List<String> ackIds = new ArrayList<>();
+ for (IncomingMessage message : batchMessages) {
+ messages.add(message);
+ ackIds.add(message.ackId);
+ }
+ if (ackIds.size() != 0) {
+ pubsubClient.acknowledge(subscriptionPath, ackIds);
}
}
- if (finallyBlockException != null) {
- throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
+ } finally {
+ if (getSubscription() == null) {
+ try {
+ pubsubClient.deleteSubscription(subscriptionPath);
+ } catch (Exception e) {
+ finallyBlockException = e;
+ }
}
+ }
+ if (finallyBlockException != null) {
+ throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
+ }
- for (IncomingMessage message : messages) {
- T element = null;
- if (parseFn != null) {
- element = parseFn.apply(new PubsubMessage(
- message.elementBytes, message.attributes));
- } else {
- element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
- }
- c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
+ for (IncomingMessage message : messages) {
+ T element = null;
+ if (parseFn != null) {
+ element = parseFn.apply(new PubsubMessage(
+ message.elementBytes, message.attributes));
+ } else {
+ element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
}
+ c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
}
}
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Read.this);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Read.this);
}
}
-
+ }
/////////////////////////////////////////////////////////////////////////////
@@ -928,12 +944,16 @@ public class PubsubIO {
* to a Cloud Pub/Sub stream.
*/
public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
/** The Cloud Pub/Sub topic to publish to. */
- @Nullable private final ValueProvider<PubsubTopic> topic;
+ @Nullable
+ private final ValueProvider<PubsubTopic> topic;
/** The name of the message attribute to publish message timestamps in. */
- @Nullable private final String timestampLabel;
+ @Nullable
+ private final String timestampLabel;
/** The name of the message attribute to publish unique message IDs in. */
- @Nullable private final String idLabel;
+ @Nullable
+ private final String idLabel;
/** The input type Coder. */
private final Coder<T> coder;
/** The format function for input PubsubMessage objects. */
@@ -1017,7 +1037,7 @@ public class PubsubIO {
* to separately set the PubSub message's payload and attributes.
*/
public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
- return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
}
@Override
@@ -1092,7 +1112,7 @@ public class PubsubIO {
* Returns the formatting function used if publishing attributes.
*/
public SimpleFunction<T, PubsubMessage> getFormatFn() {
- return formatFn;
+ return formatFn;
}
/**
@@ -1104,6 +1124,7 @@ public class PubsubIO {
* <p>Public so can be suppressed by runners.
*/
public class PubsubBoundedWriter extends DoFn<T, Void> {
+
private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
@@ -1114,7 +1135,7 @@ public class PubsubIO {
// NOTE: idLabel is ignored.
this.pubsubClient =
FACTORY.newClient(timestampLabel, null,
- c.getPipelineOptions().as(PubsubOptions.class));
+ c.getPipelineOptions().as(PubsubOptions.class));
}
@ProcessElement