You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:56 UTC

[4/9] beam git commit: Converts PubsubIO.Read to AutoValue

Converts PubsubIO.Read to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4d04606
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4d04606
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4d04606

Branch: refs/heads/master
Commit: f4d04606c105ca45a7754516781cb72b4c818baf
Parents: f5e3f52
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:14:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 244 +++++++------------
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |   8 +-
 2 files changed, 95 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index f0926d4..3c76942 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -455,64 +456,61 @@ public class PubsubIO {
     }
   }
 
+   /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
   public static <T> Read<T> read() {
-    return new Read<>();
+    return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
   public static <T> Write<T> write() {
     return new Write<>();
   }
 
-  /**
-   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
-   * returns a {@link PCollection} of {@link String Strings} containing the items from
-   * the stream.
-   */
-  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
-    /** The Cloud Pub/Sub topic to read from. */
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable
-    private final ValueProvider<PubsubTopic> topic;
+    abstract ValueProvider<PubsubTopic> getTopicProvider();
 
-    /** The Cloud Pub/Sub subscription to read from. */
     @Nullable
-    private final ValueProvider<PubsubSubscription> subscription;
+    abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();
 
     /** The name of the message attribute to read timestamps from. */
     @Nullable
-    private final String timestampLabel;
+    abstract String getTimestampLabel();
 
     /** The name of the message attribute to read unique message IDs from. */
     @Nullable
-    private final String idLabel;
+    abstract String getIdLabel();
 
     /** The coder used to decode each record. */
     @Nullable
-    private final Coder<T> coder;
+    abstract Coder<T> getCoder();
 
     /** User function for parsing PubsubMessage object. */
-    SimpleFunction<PubsubMessage, T> parseFn;
+    @Nullable
+    abstract SimpleFunction<PubsubMessage, T> getParseFn();
 
-    private Read() {
-      this(null, null, null, null, null, null, null);
-    }
+    abstract Builder<T> toBuilder();
 
-    private Read(String name, ValueProvider<PubsubSubscription> subscription,
-        ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-        String idLabel,
-        SimpleFunction<PubsubMessage, T> parseFn) {
-      super(name);
-      this.subscription = subscription;
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.coder = coder;
-      this.idLabel = idLabel;
-      this.parseFn = parseFn;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
+
+      abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
+
+      abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+      abstract Builder<T> setIdLabel(String idLabel);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
+
+      abstract Read<T> build();
     }
 
     /**
-     * Returns a transform that's like this one but reading from the
-     * given subscription.
+     * Reads from the given subscription.
      *
      * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
      * of the {@code subscription} string.
@@ -520,8 +518,6 @@ public class PubsubIO {
      * <p>Multiple readers reading from the same subscription will each receive
      * some arbitrary portion of the data.  Most likely, separate readers should
      * use their own subscriptions.
-     *
-     * <p>Does not modify this object.
      */
     public Read<T> subscription(String subscription) {
       return subscription(StaticValueProvider.of(subscription));
@@ -535,9 +531,12 @@ public class PubsubIO {
         // Validate.
         PubsubSubscription.fromPath(subscription.get());
       }
-      return new Read<>(
-          name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-          null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+      return toBuilder()
+          .setSubscriptionProvider(
+              NestedValueProvider.of(subscription, new SubscriptionTranslator()))
+          /* reset topic to null */
+          .setTopicProvider(null)
+          .build();
     }
 
     /**
@@ -563,15 +562,16 @@ public class PubsubIO {
         // Validate.
         PubsubTopic.fromPath(topic.get());
       }
-      return new Read<>(name, null /* reset subscription to null */,
-          NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, coder, idLabel, parseFn);
+      return toBuilder()
+          .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+          /* reset subscription to null */
+          .setSubscriptionProvider(null)
+          .build();
     }
 
     /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
+     * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
+     * attributes, specifies the name of the attribute that contains the timestamp.
      *
      * <p>The timestamp value is expected to be represented in the attribute as either:
      *
@@ -599,88 +599,90 @@ public class PubsubIO {
      * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
      */
     public Read<T> timestampLabel(String timestampLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
     /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
+     * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
+     * message attributes, specifies the name of the attribute containing the unique identifier.
+     * The value of the attribute can be any string that uniquely identifies this record.
      *
      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
      * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
      * be delivered, and deduplication of the stream will be strictly best effort.
      */
     public Read<T> idLabel(String idLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setIdLabel(idLabel).build();
     }
 
     /**
-     * Returns a transform that's like this one but that uses the given
-     * {@link Coder} to decode each record into a value of type {@code T}.
-     *
-     * <p>Does not modify this object.
+     * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
      */
     public Read<T> withCoder(Coder<T> coder) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setCoder(coder).build();
     }
 
     /**
-     * Causes the source to return a PubsubMessage that includes Pubsub attributes.
-     * The user must supply a parsing function to transform the PubsubMessage into an output type.
+     * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
+     * given parsing function to transform the PubsubMessage into an output type.
      * A Coder for the output type T must be registered or set on the output via
      * {@link PCollection#setCoder(Coder)}.
      */
     public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
-      return new Read<T>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setParseFn(parseFn).build();
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (topic == null && subscription == null) {
-        throw new IllegalStateException("Need to set either the topic or the subscription for "
-            + "a PubsubIO.Read transform");
+      if (getTopicProvider() == null && getSubscriptionProvider() == null) {
+        throw new IllegalStateException(
+            "Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
       }
-      if (topic != null && subscription != null) {
-        throw new IllegalStateException("Can't set both the topic and the subscription for "
-            + "a PubsubIO.Read transform");
+      if (getTopicProvider() != null && getSubscriptionProvider() != null) {
+        throw new IllegalStateException(
+            "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
       }
-      if (coder == null) {
-        throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
-            + "the withCoder method.");
+      if (getCoder() == null) {
+        throw new IllegalStateException(
+            "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
       }
 
-      @Nullable ValueProvider<ProjectPath> projectPath =
-          topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
-      @Nullable ValueProvider<TopicPath> topicPath =
-          topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
-      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-          subscription == null
+      @Nullable
+      ValueProvider<ProjectPath> projectPath =
+          getTopicProvider() == null
               ? null
-              : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
-      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
-              FACTORY, projectPath, topicPath, subscriptionPath,
-              coder, timestampLabel, idLabel, parseFn);
+              : NestedValueProvider.of(getTopicProvider(), new ProjectPathTranslator());
+      @Nullable
+      ValueProvider<TopicPath> topicPath =
+          getTopicProvider() == null
+              ? null
+              : NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
+      @Nullable
+      ValueProvider<SubscriptionPath> subscriptionPath =
+          getSubscriptionProvider() == null
+              ? null
+              : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
+      PubsubUnboundedSource<T> source =
+          new PubsubUnboundedSource<T>(
+              FACTORY,
+              projectPath,
+              topicPath,
+              subscriptionPath,
+              getCoder(),
+              getTimestampLabel(),
+              getIdLabel(),
+              getParseFn());
       return input.getPipeline().apply(source);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
 
-      if (subscription != null) {
-        String subscriptionString = subscription.isAccessible()
-            ? subscription.get().asPath() : subscription.toString();
+      if (getSubscriptionProvider() != null) {
+        String subscriptionString = getSubscriptionProvider().isAccessible()
+            ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
         builder.add(DisplayData.item("subscription", subscriptionString)
             .withLabel("Pubsub Subscription"));
       }
@@ -688,72 +690,8 @@ public class PubsubIO {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return coder;
+      return getCoder();
     }
-
-    /**
-     * Get the topic being read from.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return topic == null ? null : topic.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the topic being read from.
-     */
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Get the subscription being read from.
-     */
-    @Nullable
-    public PubsubSubscription getSubscription() {
-      return subscription == null ? null : subscription.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the subscription being read from.
-     */
-    public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
-      return subscription;
-    }
-
-    /**
-     * Get the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Get the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-
-    /**
-     * Get the {@link Coder} used for the transform's output.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Get the parse function used for PubSub attributes.
-     */
-    @Nullable
-    public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
-      return parseFn;
-    }
-
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 6e9922c..7fe6e26 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -126,8 +126,8 @@ public class PubsubIOTest {
     String subscription = "projects/project/subscriptions/subscription";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .subscription(StaticValueProvider.of(subscription));
-    assertNull(read.getTopic());
-    assertNotNull(read.getSubscription());
+    assertNull(read.getTopicProvider());
+    assertNotNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
   }
 
@@ -136,8 +136,8 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic));
-    assertNotNull(read.getTopic());
-    assertNull(read.getSubscription());
+    assertNotNull(read.getTopicProvider());
+    assertNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
   }