You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/20 23:00:09 UTC

beam git commit: Automated refactoring of PubsubIO to fix indentation

Repository: beam
Updated Branches:
  refs/heads/master c6e46b655 -> f799a57af


Automated refactoring of PubsubIO to fix indentation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f799a57a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f799a57a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f799a57a

Branch: refs/heads/master
Commit: f799a57af14722e1da26baee25bda03bf6a52b6e
Parents: c6e46b6
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jan 20 15:00:05 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jan 20 15:00:05 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 803 ++++++++++---------
 1 file changed, 412 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f799a57a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 2802871..1471953 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
  * {@link PipelineRunner PipelineRunners} for more details.
  */
 public class PubsubIO {
+
   private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
 
   /** Factory for creating pubsub client to manage transport. */
@@ -157,6 +158,7 @@ public class PubsubIO {
    * a map of attached attributes.
    */
   public static class PubsubMessage {
+
     private byte[] message;
     private Map<String, String> attributes;
 
@@ -193,7 +195,8 @@ public class PubsubIO {
    * Class representing a Cloud Pub/Sub Subscription.
    */
   public static class PubsubSubscription implements Serializable {
-    private enum Type { NORMAL, FAKE }
+
+    private enum Type {NORMAL, FAKE}
 
     private final Type type;
     private final String project;
@@ -299,6 +302,7 @@ public class PubsubIO {
    */
   private static class SubscriptionTranslator
       implements SerializableFunction<String, PubsubSubscription> {
+
     @Override
     public PubsubSubscription apply(String from) {
       return PubsubSubscription.fromPath(from);
@@ -310,6 +314,7 @@ public class PubsubIO {
    */
   private static class SubscriptionPathTranslator
       implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
+
     @Override
     public SubscriptionPath apply(PubsubSubscription from) {
       return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
@@ -321,6 +326,7 @@ public class PubsubIO {
    */
   private static class TopicTranslator
       implements SerializableFunction<String, PubsubTopic> {
+
     @Override
     public PubsubTopic apply(String from) {
       return PubsubTopic.fromPath(from);
@@ -332,6 +338,7 @@ public class PubsubIO {
    */
   private static class TopicPathTranslator
       implements SerializableFunction<PubsubTopic, TopicPath> {
+
     @Override
     public TopicPath apply(PubsubTopic from) {
       return PubsubClient.topicPathFromName(from.project, from.topic);
@@ -343,6 +350,7 @@ public class PubsubIO {
    */
   private static class ProjectPathTranslator
       implements SerializableFunction<PubsubTopic, ProjectPath> {
+
     @Override
     public ProjectPath apply(PubsubTopic from) {
       return PubsubClient.projectPathFromId(from.project);
@@ -353,7 +361,8 @@ public class PubsubIO {
    * Class representing a Cloud Pub/Sub Topic.
    */
   public static class PubsubTopic implements Serializable {
-    private enum Type { NORMAL, FAKE }
+
+    private enum Type {NORMAL, FAKE}
 
     private final Type type;
     private final String project;
@@ -471,451 +480,458 @@ public class PubsubIO {
    * can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or
    * {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
    */
-   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-      /** The Cloud Pub/Sub topic to read from. */
-      @Nullable private final ValueProvider<PubsubTopic> topic;
+  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
 
-      /** The Cloud Pub/Sub subscription to read from. */
-      @Nullable private final ValueProvider<PubsubSubscription> subscription;
+    /** The Cloud Pub/Sub topic to read from. */
+    @Nullable
+    private final ValueProvider<PubsubTopic> topic;
 
-      /** The name of the message attribute to read timestamps from. */
-      @Nullable private final String timestampLabel;
+    /** The Cloud Pub/Sub subscription to read from. */
+    @Nullable
+    private final ValueProvider<PubsubSubscription> subscription;
 
-      /** The name of the message attribute to read unique message IDs from. */
-      @Nullable private final String idLabel;
+    /** The name of the message attribute to read timestamps from. */
+    @Nullable
+    private final String timestampLabel;
 
-      /** The coder used to decode each record. */
-      @Nullable private final Coder<T> coder;
+    /** The name of the message attribute to read unique message IDs from. */
+    @Nullable
+    private final String idLabel;
 
-      /** Stop after reading this many records. */
-      private final int maxNumRecords;
+    /** The coder used to decode each record. */
+    @Nullable
+    private final Coder<T> coder;
 
-      /** Stop after reading for this much time. */
-      @Nullable private final Duration maxReadTime;
+    /** Stop after reading this many records. */
+    private final int maxNumRecords;
 
-      /** User function for parsing PubsubMessage object. */
-      SimpleFunction<PubsubMessage, T> parseFn;
+    /** Stop after reading for this much time. */
+    @Nullable
+    private final Duration maxReadTime;
 
-      private Read() {
-        this(null, null, null, null, null, null, 0, null, null);
-      }
+    /** User function for parsing PubsubMessage object. */
+    SimpleFunction<PubsubMessage, T> parseFn;
 
-      private Read(String name, ValueProvider<PubsubSubscription> subscription,
-          ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-          String idLabel, int maxNumRecords, Duration maxReadTime,
-          SimpleFunction<PubsubMessage, T> parseFn) {
-        super(name);
-        this.subscription = subscription;
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.coder = coder;
-        this.idLabel = idLabel;
-        this.maxNumRecords = maxNumRecords;
-        this.maxReadTime = maxReadTime;
-        this.parseFn = parseFn;
-      }
+    private Read() {
+      this(null, null, null, null, null, null, 0, null, null);
+    }
 
-      /**
-       * Returns a transform that's like this one but reading from the
-       * given subscription.
-       *
-       * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-       * of the {@code subscription} string.
-       *
-       * <p>Multiple readers reading from the same subscription will each receive
-       * some arbitrary portion of the data.  Most likely, separate readers should
-       * use their own subscriptions.
-       *
-       * <p>Does not modify this object.
-       */
-      public Read<T> subscription(String subscription) {
-        return subscription(StaticValueProvider.of(subscription));
-      }
+    private Read(String name, ValueProvider<PubsubSubscription> subscription,
+        ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
+        String idLabel, int maxNumRecords, Duration maxReadTime,
+        SimpleFunction<PubsubMessage, T> parseFn) {
+      super(name);
+      this.subscription = subscription;
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.coder = coder;
+      this.idLabel = idLabel;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+      this.parseFn = parseFn;
+    }
 
-      /**
-       * Like {@code subscription()} but with a {@link ValueProvider}.
-       */
-      public Read<T> subscription(ValueProvider<String> subscription) {
-        if (subscription.isAccessible()) {
-          // Validate.
-          PubsubSubscription.fromPath(subscription.get());
-        }
-        return new Read<>(name,
-            NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-            topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
-      }
+    /**
+     * Returns a transform that's like this one but reading from the
+     * given subscription.
+     *
+     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
+     * of the {@code subscription} string.
+     *
+     * <p>Multiple readers reading from the same subscription will each receive
+     * some arbitrary portion of the data.  Most likely, separate readers should
+     * use their own subscriptions.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read<T> subscription(String subscription) {
+      return subscription(StaticValueProvider.of(subscription));
+    }
 
-      /**
-       * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-       * with {@link #subscription(String)}.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
-       * of the {@code topic} string.
-       *
-       * <p>The Beam runner will start reading data published on this topic from the time the
-       * pipeline is started. Any data published on the topic before the pipeline is started will
-       * not be read by the runner.
-       */
-      public Read<T> topic(String topic) {
-        return topic(StaticValueProvider.of(topic));
+    /**
+     * Like {@code subscription()} but with a {@link ValueProvider}.
+     */
+    public Read<T> subscription(ValueProvider<String> subscription) {
+      if (subscription.isAccessible()) {
+        // Validate.
+        PubsubSubscription.fromPath(subscription.get());
       }
+      return new Read<>(name,
+          NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+          topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+    }
 
-      /**
-       * Like {@code topic()} but with a {@link ValueProvider}.
-       */
-      public Read<T> topic(ValueProvider<String> topic) {
-        if (topic.isAccessible()) {
-          // Validate.
-            PubsubTopic.fromPath(topic.get());
-        }
-        return new Read<>(name, subscription,
-            NestedValueProvider.of(topic, new TopicTranslator()),
-            timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
-      }
+    /**
+     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+     * with {@link #subscription(String)}.
+     *
+     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+     * of the {@code topic} string.
+     *
+     * <p>The Beam runner will start reading data published on this topic from the time the
+     * pipeline is started. Any data published on the topic before the pipeline is started will
+     * not be read by the runner.
+     */
+    public Read<T> topic(String topic) {
+      return topic(StaticValueProvider.of(topic));
+    }
 
-      /**
-       * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-       * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-       * parameter specifies the name of the attribute that contains the timestamp.
-       *
-       * <p>The timestamp value is expected to be represented in the attribute as either:
-       *
-       * <ul>
-       * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
-       * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
-       * value for this attribute.
-       * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
-       * sub-second component of the timestamp is optional, and digits beyond the first three
-       * (i.e., time units smaller than milliseconds) will be ignored.
-       * </ul>
-       *
-       * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
-       * the first time it sees each record. All windowing will be done relative to these
-       * timestamps.
-       *
-       * <p>By default, windows are emitted based on an estimate of when this source is likely
-       * done producing data for a given timestamp (referred to as the Watermark; see
-       * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
-       * specified with the windowing strategy &ndash; by default it will be output immediately.
-       *
-       * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-       * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
-       *
-       * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
-       */
-      public Read<T> timestampLabel(String timestampLabel) {
-        return new Read<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-            parseFn);
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public Read<T> topic(ValueProvider<String> topic) {
+      if (topic.isAccessible()) {
+        // Validate.
+        PubsubTopic.fromPath(topic.get());
       }
+      return new Read<>(name, subscription,
+          NestedValueProvider.of(topic, new TopicTranslator()),
+          timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+    }
 
-     /**
-      * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-      * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-      * parameter specifies the attribute name. The value of the attribute can be any string
-      * that uniquely identifies this record.
-      *
-      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
-      * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
-      * be delivered, and deduplication of the stream will be strictly best effort.
-      */
-      public Read<T> idLabel(String idLabel) {
-        return new Read<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-            parseFn);
-      }
+    /**
+     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+     * parameter specifies the name of the attribute that contains the timestamp.
+     *
+     * <p>The timestamp value is expected to be represented in the attribute as either:
+     *
+     * <ul>
+     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+     * value for this attribute.
+     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+     * sub-second component of the timestamp is optional, and digits beyond the first three
+     * (i.e., time units smaller than milliseconds) will be ignored.
+     * </ul>
+     *
+     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+     * the first time it sees each record. All windowing will be done relative to these
+     * timestamps.
+     *
+     * <p>By default, windows are emitted based on an estimate of when this source is likely
+     * done producing data for a given timestamp (referred to as the Watermark; see
+     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+     * specified with the windowing strategy &ndash; by default it will be output immediately.
+     *
+     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+     *
+     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
+     */
+    public Read<T> timestampLabel(String timestampLabel) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          parseFn);
+    }
 
-      /**
-       * Returns a transform that's like this one but that uses the given
-       * {@link Coder} to decode each record into a value of type {@code T}.
-       *
-       * <p>Does not modify this object.
-       */
-      public Read<T> withCoder(Coder<T> coder) {
-        return new Read<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-            parseFn);
-      }
+    /**
+     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+     * parameter specifies the attribute name. The value of the attribute can be any string
+     * that uniquely identifies this record.
+     *
+     * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+     * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+     * be delivered, and deduplication of the stream will be strictly best effort.
+     */
+    public Read<T> idLabel(String idLabel) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          parseFn);
+    }
 
-      /**
-       * Causes the source to return a PubsubMessage that includes Pubsub attributes.
-       * The user must supply a parsing function to transform the PubsubMessage into an output type.
-       * A Coder for the output type T must be registered or set on the output via
-       * {@link PCollection#setCoder(Coder)}.
-       */
-      public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
-        return new Read<T>(
-            name, subscription, topic, timestampLabel, coder, idLabel,
-            maxNumRecords, maxReadTime, parseFn);
-      }
+    /**
+     * Returns a transform that's like this one but that uses the given
+     * {@link Coder} to decode each record into a value of type {@code T}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          parseFn);
+    }
 
-     /**
-      * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-      * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
-      *
-      * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
-      * bounded source.
-      */
-      public Read<T> maxNumRecords(int maxNumRecords) {
-        return new Read<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-            parseFn);
-      }
+    /**
+     * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+     * The user must supply a parsing function to transform the PubsubMessage into an output type.
+     * A Coder for the output type T must be registered or set on the output via
+     * {@link PCollection#setCoder(Coder)}.
+     */
+    public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+      return new Read<T>(
+          name, subscription, topic, timestampLabel, coder, idLabel,
+          maxNumRecords, maxReadTime, parseFn);
+    }
 
-     /**
-      * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-      * duration during which records will be read.  The transform produces a <i>bounded</i>
-      * {@link PCollection}.
-      *
-      * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
-      * bounded source.
-      */
-      public Read<T> maxReadTime(Duration maxReadTime) {
-        return new Read<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-            parseFn);
-      }
+    /**
+     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+     * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
+     *
+     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
+     * bounded source.
+     */
+    public Read<T> maxNumRecords(int maxNumRecords) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          parseFn);
+    }
 
-      @Override
-      public PCollection<T> expand(PBegin input) {
-        if (topic == null && subscription == null) {
-          throw new IllegalStateException("Need to set either the topic or the subscription for "
-              + "a PubsubIO.Read transform");
-        }
-        if (topic != null && subscription != null) {
-          throw new IllegalStateException("Can't set both the topic and the subscription for "
-              + "a PubsubIO.Read transform");
-        }
-        if (coder == null)  {
-          throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
-              + "the withCoder method.");
-        }
+    /**
+     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+     * duration during which records will be read.  The transform produces a <i>bounded</i>
+     * {@link PCollection}.
+     *
+     * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
+     * bounded source.
+     */
+    public Read<T> maxReadTime(Duration maxReadTime) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          parseFn);
+    }
 
-        boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (topic == null && subscription == null) {
+        throw new IllegalStateException("Need to set either the topic or the subscription for "
+            + "a PubsubIO.Read transform");
+      }
+      if (topic != null && subscription != null) {
+        throw new IllegalStateException("Can't set both the topic and the subscription for "
+            + "a PubsubIO.Read transform");
+      }
+      if (coder == null) {
+        throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+            + "the withCoder method.");
+      }
 
-        if (boundedOutput) {
-          return input.getPipeline().begin()
-                      .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-                      .apply(ParDo.of(new PubsubBoundedReader()))
-                      .setCoder(coder);
-        } else {
-          @Nullable ValueProvider<ProjectPath> projectPath =
-              topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
-          @Nullable ValueProvider<TopicPath> topicPath =
-              topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
-          @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-              subscription == null
-                  ? null
-                  : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
-          return input.getPipeline().begin()
-                      .apply(new PubsubUnboundedSource<T>(
-                          FACTORY, projectPath, topicPath, subscriptionPath,
-                          coder, timestampLabel, idLabel, parseFn));
-        }
+      boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
+
+      if (boundedOutput) {
+        return input.getPipeline().begin()
+            .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+            .apply(ParDo.of(new PubsubBoundedReader()))
+            .setCoder(coder);
+      } else {
+        @Nullable ValueProvider<ProjectPath> projectPath =
+            topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+        @Nullable ValueProvider<TopicPath> topicPath =
+            topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+        @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+            subscription == null
+                ? null
+                : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
+        return input.getPipeline().begin()
+            .apply(new PubsubUnboundedSource<T>(
+                FACTORY, projectPath, topicPath, subscriptionPath,
+                coder, timestampLabel, idLabel, parseFn));
       }
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
 
-        builder
-            .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
+      builder
+          .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
               .withLabel("Maximum Read Time"))
-            .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
+          .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
               .withLabel("Maximum Read Records"), 0);
 
-        if (subscription != null) {
-          String subscriptionString = subscription.isAccessible()
-              ? subscription.get().asPath() : subscription.toString();
-          builder.add(DisplayData.item("subscription", subscriptionString)
-              .withLabel("Pubsub Subscription"));
-        }
+      if (subscription != null) {
+        String subscriptionString = subscription.isAccessible()
+            ? subscription.get().asPath() : subscription.toString();
+        builder.add(DisplayData.item("subscription", subscriptionString)
+            .withLabel("Pubsub Subscription"));
       }
+    }
 
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
-      }
+    @Override
+    protected Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
 
-     /**
-      * Get the topic being read from.
-      */
-      public PubsubTopic getTopic() {
-        return topic == null ? null : topic.get();
-      }
+    /**
+     * Get the topic being read from.
+     */
+    public PubsubTopic getTopic() {
+      return topic == null ? null : topic.get();
+    }
 
-     /**
-      * Get the {@link ValueProvider} for the topic being read from.
-      */
-      public ValueProvider<PubsubTopic> getTopicProvider() {
-        return topic;
-      }
+    /**
+     * Get the {@link ValueProvider} for the topic being read from.
+     */
+    public ValueProvider<PubsubTopic> getTopicProvider() {
+      return topic;
+    }
 
-     /**
-      * Get the subscription being read from.
-      */
-      public PubsubSubscription getSubscription() {
-        return subscription == null ? null : subscription.get();
-      }
+    /**
+     * Get the subscription being read from.
+     */
+    public PubsubSubscription getSubscription() {
+      return subscription == null ? null : subscription.get();
+    }
 
-     /**
-      * Get the {@link ValueProvider} for the subscription being read from.
-      */
-      public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
-        return subscription;
-      }
+    /**
+     * Get the {@link ValueProvider} for the subscription being read from.
+     */
+    public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
+      return subscription;
+    }
 
-     /**
-      * Get the timestamp label.
-      */
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
+    /**
+     * Get the timestamp label.
+     */
+    public String getTimestampLabel() {
+      return timestampLabel;
+    }
 
-     /**
-      * Get the id label.
-      */
-      public String getIdLabel() {
-        return idLabel;
-      }
+    /**
+     * Get the id label.
+     */
+    public String getIdLabel() {
+      return idLabel;
+    }
 
 
-     /**
-      * Get the {@link Coder} used for the transform's output.
-      */
-      public Coder<T> getCoder() {
+    /**
+     * Get the {@link Coder} used for the transform's output.
+     */
+    public Coder<T> getCoder() {
       return coder;
     }
 
-     /**
-      * Get the maximum number of records to read.
-      */
-      public int getMaxNumRecords() {
-        return maxNumRecords;
-      }
+    /**
+     * Get the maximum number of records to read.
+     */
+    public int getMaxNumRecords() {
+      return maxNumRecords;
+    }
 
-     /**
-      * Get the maximum read time.
-      */
-      public Duration getMaxReadTime() {
-        return maxReadTime;
-      }
+    /**
+     * Get the maximum read time.
+     */
+    public Duration getMaxReadTime() {
+      return maxReadTime;
+    }
 
-      /**
-       * Get the parse function used for PubSub attributes.
-       */
-      public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
-          return parseFn;
-        }
+    /**
+     * Get the parse function used for PubSub attributes.
+     */
+    public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+      return parseFn;
+    }
 
-      /**
-       * Default reader when Pubsub subscription has some form of upper bound.
-       *
-       * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
-       * of PubsubUnboundedSource.
-       *
-       * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
-       * service in streaming mode.
-       *
-       * <p>Public so can be suppressed by runners.
-       */
-      public class PubsubBoundedReader extends DoFn<Void, T> {
-        private static final int DEFAULT_PULL_SIZE = 100;
-        private static final int ACK_TIMEOUT_SEC = 60;
-
-        @ProcessElement
-        public void processElement(ProcessContext c) throws IOException {
-          try (PubsubClient pubsubClient =
-                   FACTORY.newClient(timestampLabel, idLabel,
-                                     c.getPipelineOptions().as(PubsubOptions.class))) {
-
-            PubsubClient.SubscriptionPath subscriptionPath;
-            if (getSubscription() == null) {
-              TopicPath topicPath =
-                  PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
-              // The subscription will be registered under this pipeline's project if we know it.
-              // Otherwise we'll fall back to the topic's project.
-              // Note that they don't need to be the same.
-              String projectId =
-                  c.getPipelineOptions().as(PubsubOptions.class).getProject();
-              if (Strings.isNullOrEmpty(projectId)) {
-                projectId = getTopic().project;
-              }
-              ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
-              try {
-                subscriptionPath =
-                    pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
-              } catch (Exception e) {
-                throw new RuntimeException("Failed to create subscription: ", e);
-              }
-            } else {
+    /**
+     * Default reader when Pubsub subscription has some form of upper bound.
+     *
+     * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
+     * of PubsubUnboundedSource.
+     *
+     * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+     * service in streaming mode.
+     *
+     * <p>Public so can be suppressed by runners.
+     */
+    public class PubsubBoundedReader extends DoFn<Void, T> {
+
+      private static final int DEFAULT_PULL_SIZE = 100;
+      private static final int ACK_TIMEOUT_SEC = 60;
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        try (PubsubClient pubsubClient =
+            FACTORY.newClient(timestampLabel, idLabel,
+                c.getPipelineOptions().as(PubsubOptions.class))) {
+
+          PubsubClient.SubscriptionPath subscriptionPath;
+          if (getSubscription() == null) {
+            TopicPath topicPath =
+                PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+            // The subscription will be registered under this pipeline's project if we know it.
+            // Otherwise we'll fall back to the topic's project.
+            // Note that they don't need to be the same.
+            String projectId =
+                c.getPipelineOptions().as(PubsubOptions.class).getProject();
+            if (Strings.isNullOrEmpty(projectId)) {
+              projectId = getTopic().project;
+            }
+            ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
+            try {
               subscriptionPath =
-                  PubsubClient.subscriptionPathFromName(getSubscription().project,
-                                                        getSubscription().subscription);
+                  pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
+            } catch (Exception e) {
+              throw new RuntimeException("Failed to create subscription: ", e);
             }
+          } else {
+            subscriptionPath =
+                PubsubClient.subscriptionPathFromName(getSubscription().project,
+                    getSubscription().subscription);
+          }
 
-            Instant endTime = (getMaxReadTime() == null)
-                              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
+          Instant endTime = (getMaxReadTime() == null)
+              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
 
-            List<IncomingMessage> messages = new ArrayList<>();
+          List<IncomingMessage> messages = new ArrayList<>();
 
-            Throwable finallyBlockException = null;
-            try {
-              while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
-                     && Instant.now().isBefore(endTime)) {
-                int batchSize = DEFAULT_PULL_SIZE;
-                if (getMaxNumRecords() > 0) {
-                  batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
-                }
-
-                List<IncomingMessage> batchMessages =
-                    pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
-                        false);
-                List<String> ackIds = new ArrayList<>();
-                for (IncomingMessage message : batchMessages) {
-                  messages.add(message);
-                  ackIds.add(message.ackId);
-                }
-                if (ackIds.size() != 0) {
-                  pubsubClient.acknowledge(subscriptionPath, ackIds);
-                }
+          Throwable finallyBlockException = null;
+          try {
+            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
+                && Instant.now().isBefore(endTime)) {
+              int batchSize = DEFAULT_PULL_SIZE;
+              if (getMaxNumRecords() > 0) {
+                batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
               }
-            } catch (IOException e) {
-              throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
-            } finally {
-              if (getSubscription() == null) {
-                try {
-                  pubsubClient.deleteSubscription(subscriptionPath);
-                } catch (Exception e) {
-                  finallyBlockException = e;
-                }
+
+              List<IncomingMessage> batchMessages =
+                  pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
+                      false);
+              List<String> ackIds = new ArrayList<>();
+              for (IncomingMessage message : batchMessages) {
+                messages.add(message);
+                ackIds.add(message.ackId);
+              }
+              if (ackIds.size() != 0) {
+                pubsubClient.acknowledge(subscriptionPath, ackIds);
               }
             }
-            if (finallyBlockException != null) {
-              throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
+          } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
+          } finally {
+            if (getSubscription() == null) {
+              try {
+                pubsubClient.deleteSubscription(subscriptionPath);
+              } catch (Exception e) {
+                finallyBlockException = e;
+              }
             }
+          }
+          if (finallyBlockException != null) {
+            throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
+          }
 
-            for (IncomingMessage message : messages) {
-              T element = null;
-              if (parseFn != null) {
-                element = parseFn.apply(new PubsubMessage(
-                        message.elementBytes, message.attributes));
-              } else {
-                element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
-              }
-              c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
+          for (IncomingMessage message : messages) {
+            T element = null;
+            if (parseFn != null) {
+              element = parseFn.apply(new PubsubMessage(
+                  message.elementBytes, message.attributes));
+            } else {
+              element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
             }
+            c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
           }
         }
+      }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(Read.this);
-        }
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.delegate(Read.this);
       }
     }
-
+  }
 
   /////////////////////////////////////////////////////////////////////////////
 
@@ -928,12 +944,16 @@ public class PubsubIO {
    * to a Cloud Pub/Sub stream.
    */
   public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
     /** The Cloud Pub/Sub topic to publish to. */
-    @Nullable private final ValueProvider<PubsubTopic> topic;
+    @Nullable
+    private final ValueProvider<PubsubTopic> topic;
     /** The name of the message attribute to publish message timestamps in. */
-    @Nullable private final String timestampLabel;
+    @Nullable
+    private final String timestampLabel;
     /** The name of the message attribute to publish unique message IDs in. */
-    @Nullable private final String idLabel;
+    @Nullable
+    private final String idLabel;
     /** The input type Coder. */
     private final Coder<T> coder;
     /** The format function for input PubsubMessage objects. */
@@ -1017,7 +1037,7 @@ public class PubsubIO {
      * to separately set the PubSub message's payload and attributes.
      */
     public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
-        return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
     }
 
     @Override
@@ -1092,7 +1112,7 @@ public class PubsubIO {
      * Returns the formatting function used if publishing attributes.
      */
     public SimpleFunction<T, PubsubMessage> getFormatFn() {
-        return formatFn;
+      return formatFn;
     }
 
     /**
@@ -1104,6 +1124,7 @@ public class PubsubIO {
      * <p>Public so can be suppressed by runners.
      */
     public class PubsubBoundedWriter extends DoFn<T, Void> {
+
       private static final int MAX_PUBLISH_BATCH_SIZE = 100;
       private transient List<OutgoingMessage> output;
       private transient PubsubClient pubsubClient;
@@ -1114,7 +1135,7 @@ public class PubsubIO {
         // NOTE: idLabel is ignored.
         this.pubsubClient =
             FACTORY.newClient(timestampLabel, null,
-                              c.getPipelineOptions().as(PubsubOptions.class));
+                c.getPipelineOptions().as(PubsubOptions.class));
       }
 
       @ProcessElement