You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:57 UTC

[5/9] beam git commit: Converts PubsubIO.Write to AutoValue

Converts PubsubIO.Write to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df6ef969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df6ef969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df6ef969

Branch: refs/heads/master
Commit: df6ef969d6df5c42d091cc00997b0ed7680315fb
Parents: 9e81548
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:34:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 166 +++++++------------
 1 file changed, 61 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/df6ef969/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 69a5bd6..5702af1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -461,8 +461,9 @@ public class PubsubIO {
     return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
+  /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
   public static <T> Write<T> write() {
-    return new Write<>();
+    return new AutoValue_PubsubIO_Write.Builder<T>().build();
   }
 
   /** Implementation of {@link #read}. */
@@ -696,43 +697,47 @@ public class PubsubIO {
   private PubsubIO() {}
 
 
-  /**
-   * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-   * to a Cloud Pub/Sub stream.
-   */
-  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
-
-    /** The Cloud Pub/Sub topic to publish to. */
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
-    private final ValueProvider<PubsubTopic> topic;
+    abstract ValueProvider<PubsubTopic> getTopicProvider();
+
     /** The name of the message attribute to publish message timestamps in. */
     @Nullable
-    private final String timestampLabel;
+    abstract String getTimestampLabel();
+
     /** The name of the message attribute to publish unique message IDs in. */
     @Nullable
-    private final String idLabel;
+    abstract String getIdLabel();
+
     /** The input type Coder. */
-    private final Coder<T> coder;
+    @Nullable
+    abstract Coder<T> getCoder();
+
     /** The format function for input PubsubMessage objects. */
-    SimpleFunction<T, PubsubMessage> formatFn;
+    @Nullable
+    abstract SimpleFunction<T, PubsubMessage> getFormatFn();
 
-    private Write() {
-      this(null, null, null, null, null, null);
-    }
+    abstract Builder<T> toBuilder();
 
-    private Write(
-        String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
-        String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
-      super(name);
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
-      this.coder = coder;
-      this.formatFn = formatFn;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
+
+      abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+      abstract Builder<T> setIdLabel(String idLabel);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);
+
+      abstract Write<T> build();
     }
 
     /**
-     * Creates a transform that publishes to the specified topic.
+     * Publishes to the specified topic.
      *
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
      * {@code topic} string.
@@ -745,14 +750,15 @@ public class PubsubIO {
      * Like {@code topic()} but with a {@link ValueProvider}.
      */
     public Write<T> topic(ValueProvider<String> topic) {
-      return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, idLabel, coder, formatFn);
+      return toBuilder()
+          .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+          .build();
     }
 
     /**
-     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
-     * messages in an attribute with the specified name. The value of the attribute will be a number
-     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+     * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
+     * with the specified name. The value of the attribute will be a number representing the number
+     * of milliseconds since the Unix epoch. For example, if using the Joda
      * time classes, {@link Instant#Instant(long)} can be used to parse this value.
      *
      * <p>If the output from this sink is being read by another Beam pipeline, then
@@ -760,32 +766,27 @@ public class PubsubIO {
      * these timestamps from the appropriate attribute.
      */
     public Write<T> timestampLabel(String timestampLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
     /**
-     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
-     * published messages in an attribute with the specified name. The value of the attribute is an
-     * opaque string.
+     * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
+     * attribute with the specified name. The value of the attribute is an opaque string.
      *
      * <p>If the the output from this sink is being read by another Beam pipeline, then
      * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
     public Write<T> idLabel(String idLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setIdLabel(idLabel).build();
     }
 
     /**
-     * Returns a new transform that's like this one
-     * but that uses the given {@link Coder} to encode each of
-     * the elements of the input {@link PCollection} into an
-     * output record.
-     *
-     * <p>Does not modify this object.
+     * Uses the given {@link Coder} to encode each of the elements of the input {@link PCollection}
+     * into an output record.
      */
     public Write<T> withCoder(Coder<T> coder) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setCoder(coder).build();
     }
 
     /**
@@ -794,12 +795,12 @@ public class PubsubIO {
      * to separately set the PubSub message's payload and attributes.
      */
     public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
-      return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setFormatFn(formatFn).build();
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (topic == null) {
+      if (getTopicProvider() == null) {
         throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
       }
       switch (input.isBounded()) {
@@ -809,11 +810,11 @@ public class PubsubIO {
         case UNBOUNDED:
           return input.apply(new PubsubUnboundedSink<T>(
               FACTORY,
-              NestedValueProvider.of(topic, new TopicPathTranslator()),
-              coder,
-              timestampLabel,
-              idLabel,
-              formatFn,
+              NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
+              getCoder(),
+              getTimestampLabel(),
+              getIdLabel(),
+              getFormatFn(),
               100 /* numShards */));
       }
       throw new RuntimeException(); // cases are exhaustive.
@@ -822,7 +823,7 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
     }
 
     @Override
@@ -831,54 +832,6 @@ public class PubsubIO {
     }
 
     /**
-     * Returns the PubSub topic being written to.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return (topic == null) ? null : topic.get();
-    }
-
-    /**
-     * Returns the {@link ValueProvider} for the topic being written to.
-     */
-    @Nullable
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Returns the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Returns the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-    /**
-     * Returns the output coder.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Returns the formatting function used if publishing attributes.
-     */
-    @Nullable
-    public SimpleFunction<T, PubsubMessage> getFormatFn() {
-      return formatFn;
-    }
-
-    /**
      * Writer to Pubsub which batches messages from bounded collections.
      *
      * <p>Public so can be suppressed by runners.
@@ -894,7 +847,7 @@ public class PubsubIO {
         this.output = new ArrayList<>();
         // NOTE: idLabel is ignored.
         this.pubsubClient =
-            FACTORY.newClient(timestampLabel, null,
+            FACTORY.newClient(getTimestampLabel(), null,
                 c.getPipelineOptions().as(PubsubOptions.class));
       }
 
@@ -902,8 +855,8 @@ public class PubsubIO {
       public void processElement(ProcessContext c) throws IOException {
         byte[] payload = null;
         Map<String, String> attributes = null;
-        if (formatFn != null) {
-          PubsubMessage message = formatFn.apply(c.element());
+        if (getFormatFn() != null) {
+          PubsubMessage message = getFormatFn().apply(c.element());
           payload = message.getMessage();
           attributes = message.getAttributeMap();
         } else {
@@ -930,9 +883,12 @@ public class PubsubIO {
       }
 
       private void publish() throws IOException {
-        int n = pubsubClient.publish(
-            PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
-            output);
+        PubsubTopic topic = getTopicProvider().get();
+        int n =
+            pubsubClient.publish(
+                PubsubClient.topicPathFromName(
+                    topic.project, topic.topic),
+                output);
         checkState(n == output.size());
         output.clear();
       }