You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:53 UTC

[1/9] beam git commit: Renames {id, timestamp}Label to {id, timestamp}Attribute throughout SDK

Repository: beam
Updated Branches:
  refs/heads/master f5e3f5230 -> 14d60b26e


Renames {id,timestamp}Label to {id,timestamp}Attribute throughout SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8853d53d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8853d53d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8853d53d

Branch: refs/heads/master
Commit: 8853d53d9ffdf6e68c80880f6dd5f2d11a6e451e
Parents: f065114
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 27 17:19:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  2 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 18 ++---
 .../org/apache/beam/sdk/util/PropertyNames.java |  4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java    | 42 ++++++-----
 .../sdk/io/gcp/pubsub/PubsubGrpcClient.java     | 36 +++++-----
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 74 ++++++++++----------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     | 36 +++++-----
 .../sdk/io/gcp/pubsub/PubsubTestClient.java     |  6 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  | 58 ++++++++-------
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 61 +++++++++-------
 .../sdk/io/gcp/pubsub/PubsubClientTest.java     | 50 ++++++-------
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 16 +++--
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    | 24 +++----
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 14 ++--
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  | 10 +--
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   |  6 +-
 17 files changed, 238 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d628497..a46d3c5 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -252,7 +252,7 @@ public class GameStats extends LeaderBoard {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
         .apply(PubsubIO.readStrings()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+            .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index fbffac6..9af34c5 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -191,7 +191,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
         .apply(PubsubIO.readStrings()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+            .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     gameEvents.apply("CalculateTeamScores",

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 63c2191..a61fe49 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -922,12 +922,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName());
         }
       }
-      if (overriddenTransform.getTimestampLabel() != null) {
+      if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
-            PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
+            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
       }
-      if (overriddenTransform.getIdLabel() != null) {
-        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+      if (overriddenTransform.getIdAttribute() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
       }
       if (overriddenTransform.getWithAttributesParseFn() != null) {
         stepContext.addInput(
@@ -997,12 +998,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             PropertyNames.PUBSUB_TOPIC_OVERRIDE,
             ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
       }
-      if (overriddenTransform.getTimestampLabel() != null) {
+      if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
-            PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
+            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
       }
-      if (overriddenTransform.getIdLabel() != null) {
-        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+      if (overriddenTransform.getIdAttribute() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
       }
       if (overriddenTransform.getFormatFn() != null) {
         stepContext.addInput(

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index ee25448..aa5855b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -82,11 +82,11 @@ public class PropertyNames {
   public static final String OUTPUT_NAME = "output_name";
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PHASE = "phase";
-  public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+  public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
   public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
   public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
-  public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
+  public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
   public static final String PUBSUB_TOPIC = "pubsub_topic";
   public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
   public static final String SCALAR_FIELD_NAME = "value";

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 3a69799..cfe36ee 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -42,16 +42,15 @@ abstract class PubsubClient implements Closeable {
    */
   public interface PubsubClientFactory extends Serializable {
     /**
-     * Construct a new Pubsub client. It should be closed via {@link #close} in order
-     * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
-     * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
-     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
-     * timestamps/ids within message metadata.
+     * Construct a new Pubsub client. It should be closed via {@link #close} in order to ensure tidy
+     * cleanup of underlying netty resources (or use the try-with-resources construct). Uses {@code
+     * options} to derive pubsub endpoints and application credentials. If non-{@literal null}, use
+     * {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within
+     * message metadata.
      */
     PubsubClient newClient(
-        @Nullable String timestampLabel,
-        @Nullable String idLabel,
-        PubsubOptions options) throws IOException;
+        @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
+        throws IOException;
 
     /**
      * Return the display name for this factory. Eg "Json", "gRPC".
@@ -86,33 +85,33 @@ abstract class PubsubClient implements Closeable {
    * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
    * attributes} and {@code pubsubTimestamp}.
    *
-   * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
-   * that label, and the value of that label will be taken as the timestamp.
+   * <p>If {@code timestampAttribute} is non-{@literal null} then the message attributes must
+   * contain that attribute, and the value of that attribute will be taken as the timestamp.
    * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
    * pubsubTimestamp}.
    *
    * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
-   * or RFC3339 time.
+   *     or RFC3339 time.
    */
   protected static long extractTimestamp(
-      @Nullable String timestampLabel,
+      @Nullable String timestampAttribute,
       @Nullable String pubsubTimestamp,
       @Nullable Map<String, String> attributes) {
     Long timestampMsSinceEpoch;
-    if (Strings.isNullOrEmpty(timestampLabel)) {
+    if (Strings.isNullOrEmpty(timestampAttribute)) {
       timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
       checkArgument(timestampMsSinceEpoch != null,
                     "Cannot interpret PubSub publish timestamp: %s",
                     pubsubTimestamp);
     } else {
-      String value = attributes == null ? null : attributes.get(timestampLabel);
+      String value = attributes == null ? null : attributes.get(timestampAttribute);
       checkArgument(value != null,
-                    "PubSub message is missing a value for timestamp label %s",
-                    timestampLabel);
+                    "PubSub message is missing a value for timestamp attribute %s",
+                    timestampAttribute);
       timestampMsSinceEpoch = asMsSinceEpoch(value);
       checkArgument(timestampMsSinceEpoch != null,
-                    "Cannot interpret value of label %s as timestamp: %s",
-                    timestampLabel, value);
+                    "Cannot interpret value of attribute %s as timestamp: %s",
+                    timestampAttribute, value);
     }
     return timestampMsSinceEpoch;
   }
@@ -317,11 +316,10 @@ abstract class PubsubClient implements Closeable {
     public final long timestampMsSinceEpoch;
 
     /**
-     * If using an id label, the record id to associate with this record's metadata so the receiver
-     * can reject duplicates. Otherwise {@literal null}.
+     * If using an id attribute, the record id to associate with this record's metadata so the
+     * receiver can reject duplicates. Otherwise {@literal null}.
      */
-    @Nullable
-    public final String recordId;
+    @Nullable public final String recordId;
 
     public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
                            long timestampMsSinceEpoch, @Nullable String recordId) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 16de648..9778edf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -79,7 +79,7 @@ class PubsubGrpcClient extends PubsubClient {
   private static class PubsubGrpcClientFactory implements PubsubClientFactory {
     @Override
     public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
         throws IOException {
       ManagedChannel channel = NettyChannelBuilder
           .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
@@ -87,8 +87,8 @@ class PubsubGrpcClient extends PubsubClient {
           .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
           .build();
 
-      return new PubsubGrpcClient(timestampLabel,
-                                  idLabel,
+      return new PubsubGrpcClient(timestampAttribute,
+                                  idAttribute,
                                   DEFAULT_TIMEOUT_S,
                                   channel,
                                   options.getGcpCredential());
@@ -122,17 +122,17 @@ class PubsubGrpcClient extends PubsubClient {
   private final Credentials credentials;
 
   /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * Attribute to use for custom timestamps, or {@literal null} if should use Pubsub publish time
    * instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   * Attribute to use for custom ids, or {@literal null} if should use Pubsub provided ids.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
 
   /**
@@ -144,13 +144,13 @@ class PubsubGrpcClient extends PubsubClient {
 
   @VisibleForTesting
   PubsubGrpcClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       int timeoutSec,
       ManagedChannel publisherChannel,
       Credentials credentials) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.timeoutSec = timeoutSec;
     this.publisherChannel = publisherChannel;
     this.credentials = credentials;
@@ -226,13 +226,13 @@ class PubsubGrpcClient extends PubsubClient {
         message.putAllAttributes(outgoingMessage.attributes);
       }
 
-      if (timestampLabel != null) {
+      if (timestampAttribute != null) {
         message.getMutableAttributes()
-               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+               .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId);
       }
 
       request.addMessages(message);
@@ -273,7 +273,7 @@ class PubsubGrpcClient extends PubsubClient {
                                                + timestampProto.getNanos() / 1000L);
       }
       long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+          extractTimestamp(timestampAttribute, pubsubTimestampString, attributes);
 
       // Ack id.
       String ackId = message.getAckId();
@@ -281,8 +281,8 @@ class PubsubGrpcClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
+      if (idAttribute != null && attributes != null) {
+        recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
         // Fall back to the Pubsub provided message id.

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3a7522e..129a25f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -136,12 +136,12 @@ public class PubsubIO {
    * Populate common {@link DisplayData} between Pubsub source and sink.
    */
   private static void populateCommonDisplayData(DisplayData.Builder builder,
-      String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
+      String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) {
     builder
-        .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
-            .withLabel("Timestamp Label Attribute"))
-        .addIfNotNull(DisplayData.item("idLabel", idLabel)
-            .withLabel("ID Label Attribute"));
+        .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)
+            .withLabel("Timestamp Attribute"))
+        .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
+            .withLabel("ID Attribute"));
 
     if (topic != null) {
       String topicString = topic.isAccessible() ? topic.get().asPath()
@@ -529,11 +529,11 @@ public class PubsubIO {
 
     /** The name of the message attribute to read timestamps from. */
     @Nullable
-    abstract String getTimestampLabel();
+    abstract String getTimestampAttribute();
 
     /** The name of the message attribute to read unique message IDs from. */
     @Nullable
-    abstract String getIdLabel();
+    abstract String getIdAttribute();
 
     /** The coder used to decode each record. */
     @Nullable
@@ -551,9 +551,9 @@ public class PubsubIO {
 
       abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
 
-      abstract Builder<T> setTimestampLabel(String timestampLabel);
+      abstract Builder<T> setTimestampAttribute(String timestampAttribute);
 
-      abstract Builder<T> setIdLabel(String idLabel);
+      abstract Builder<T> setIdAttribute(String idAttribute);
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
@@ -633,7 +633,7 @@ public class PubsubIO {
      * (i.e., time units smaller than milliseconds) will be ignored.
      * </ul>
      *
-     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+     * <p>If {@code timestampAttribute} is not provided, the system will generate record timestamps
      * the first time it sees each record. All windowing will be done relative to these
      * timestamps.
      *
@@ -643,12 +643,12 @@ public class PubsubIO {
      * specified with the windowing strategy &ndash; by default it will be output immediately.
      *
      * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+     * timestamps by arrival time (i.e. {@code timestampAttribute} is not provided).
      *
      * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
      */
-    public Read<T> withTimestampLabel(String timestampLabel) {
-      return toBuilder().setTimestampLabel(timestampLabel).build();
+    public Read<T> withTimestampAttribute(String timestampAttribute) {
+      return toBuilder().setTimestampAttribute(timestampAttribute).build();
     }
 
     /**
@@ -657,11 +657,11 @@ public class PubsubIO {
      * The value of the attribute can be any string that uniquely identifies this record.
      *
      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
-     * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+     * If {@code idAttribute} is not provided, Beam cannot guarantee that no duplicate data will
      * be delivered, and deduplication of the stream will be strictly best effort.
      */
-    public Read<T> withIdLabel(String idLabel) {
-      return toBuilder().setIdLabel(idLabel).build();
+    public Read<T> withIdAttribute(String idAttribute) {
+      return toBuilder().setIdAttribute(idAttribute).build();
     }
 
     /**
@@ -718,8 +718,8 @@ public class PubsubIO {
               topicPath,
               subscriptionPath,
               getCoder(),
-              getTimestampLabel(),
-              getIdLabel(),
+              getTimestampAttribute(),
+              getIdAttribute(),
               getParseFn());
       return input.getPipeline().apply(source);
     }
@@ -727,7 +727,8 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
+      populateCommonDisplayData(
+          builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
 
       if (getSubscriptionProvider() != null) {
         String subscriptionString = getSubscriptionProvider().isAccessible()
@@ -757,11 +758,11 @@ public class PubsubIO {
 
     /** The name of the message attribute to publish message timestamps in. */
     @Nullable
-    abstract String getTimestampLabel();
+    abstract String getTimestampAttribute();
 
     /** The name of the message attribute to publish unique message IDs in. */
     @Nullable
-    abstract String getIdLabel();
+    abstract String getIdAttribute();
 
     /** The input type Coder. */
     @Nullable
@@ -777,9 +778,9 @@ public class PubsubIO {
     abstract static class Builder<T> {
       abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
 
-      abstract Builder<T> setTimestampLabel(String timestampLabel);
+      abstract Builder<T> setTimestampAttribute(String timestampAttribute);
 
-      abstract Builder<T> setIdLabel(String idLabel);
+      abstract Builder<T> setIdAttribute(String idAttribute);
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
@@ -814,23 +815,23 @@ public class PubsubIO {
      * time classes, {@link Instant#Instant(long)} can be used to parse this value.
      *
      * <p>If the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
-     * these timestamps from the appropriate attribute.
+     * {@link PubsubIO.Read#withTimestampAttribute(String)} can be used to ensure the other source
+     * reads these timestamps from the appropriate attribute.
      */
-    public Write<T> withTimestampLabel(String timestampLabel) {
-      return toBuilder().setTimestampLabel(timestampLabel).build();
+    public Write<T> withTimestampAttribute(String timestampAttribute) {
+      return toBuilder().setTimestampAttribute(timestampAttribute).build();
     }
 
     /**
      * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
      * attribute with the specified name. The value of the attribute is an opaque string.
      *
-     * <p>If the the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
+     * <p>If the the output from this sink is being read by another Beam pipeline, then {@link
+     * PubsubIO.Read#withIdAttribute(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
-    public Write<T> withIdLabel(String idLabel) {
-      return toBuilder().setIdLabel(idLabel).build();
+    public Write<T> withIdAttribute(String idAttribute) {
+      return toBuilder().setIdAttribute(idAttribute).build();
     }
 
     /**
@@ -864,8 +865,8 @@ public class PubsubIO {
               FACTORY,
               NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
               getCoder(),
-              getTimestampLabel(),
-              getIdLabel(),
+              getTimestampAttribute(),
+              getIdAttribute(),
               getFormatFn(),
               100 /* numShards */));
       }
@@ -875,7 +876,8 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
+      populateCommonDisplayData(
+          builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
     }
 
     @Override
@@ -897,9 +899,9 @@ public class PubsubIO {
       @StartBundle
       public void startBundle(Context c) throws IOException {
         this.output = new ArrayList<>();
-        // NOTE: idLabel is ignored.
+        // NOTE: idAttribute is ignored.
         this.pubsubClient =
-            FACTORY.newClient(getTimestampLabel(), null,
+            FACTORY.newClient(getTimestampAttribute(), null,
                 c.getPipelineOptions().as(PubsubOptions.class));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 39184fb..b745422 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -69,7 +69,7 @@ class PubsubJsonClient extends PubsubClient {
 
     @Override
     public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
         throws IOException {
       Pubsub pubsub = new Builder(
           Transport.getTransport(),
@@ -82,7 +82,7 @@ class PubsubJsonClient extends PubsubClient {
           .setApplicationName(options.getAppName())
           .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
           .build();
-      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+      return new PubsubJsonClient(timestampAttribute, idAttribute, pubsub);
     }
 
     @Override
@@ -97,17 +97,17 @@ class PubsubJsonClient extends PubsubClient {
   public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
 
   /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * Attribute to use for custom timestamps, or {@literal null} if should use Pubsub publish time
    * instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   * Attribute to use for custom ids, or {@literal null} if should use Pubsub provided ids.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * Underlying JSON transport.
@@ -116,11 +116,11 @@ class PubsubJsonClient extends PubsubClient {
 
   @VisibleForTesting
   PubsubJsonClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       Pubsub pubsub) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.pubsub = pubsub;
   }
 
@@ -137,19 +137,19 @@ class PubsubJsonClient extends PubsubClient {
       PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
 
       Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+      if ((timestampAttribute != null || idAttribute != null) && attributes == null) {
         attributes = new TreeMap<>();
       }
       if (attributes != null) {
         pubsubMessage.setAttributes(attributes);
       }
 
-      if (timestampLabel != null) {
-        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      if (timestampAttribute != null) {
+        attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        attributes.put(idLabel, outgoingMessage.recordId);
+      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        attributes.put(idAttribute, outgoingMessage.recordId);
       }
 
       pubsubMessages.add(pubsubMessage);
@@ -188,7 +188,7 @@ class PubsubJsonClient extends PubsubClient {
 
       // Timestamp.
       long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+          extractTimestamp(timestampAttribute, message.getMessage().getPublishTime(), attributes);
 
       // Ack id.
       String ackId = message.getAckId();
@@ -196,8 +196,8 @@ class PubsubJsonClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
+      if (idAttribute != null && attributes != null) {
+        recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
         // Fall back to the Pubsub provided message id.

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index 9d40e41..df90597 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -136,7 +136,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
     return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
           throws IOException {
         return new PubsubTestClient();
       }
@@ -182,7 +182,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
     return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
           throws IOException {
         return new PubsubTestClient();
       }
@@ -226,7 +226,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
 
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
           throws IOException {
         return new PubsubTestClient() {
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 002e979..8d273ba 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -224,8 +224,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
     private final ValueProvider<TopicPath> topic;
-    private final String timestampLabel;
-    private final String idLabel;
+    private final String timestampAttribute;
+    private final String idAttribute;
     private final int publishBatchSize;
     private final int publishBatchBytes;
 
@@ -240,12 +240,16 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     private final Counter byteCounter = SinkMetrics.bytesWritten();
 
     WriterFn(
-        PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
-        String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
+        PubsubClientFactory pubsubFactory,
+        ValueProvider<TopicPath> topic,
+        String timestampAttribute,
+        String idAttribute,
+        int publishBatchSize,
+        int publishBatchBytes) {
       this.pubsubFactory = pubsubFactory;
       this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
+      this.timestampAttribute = timestampAttribute;
+      this.idAttribute = idAttribute;
       this.publishBatchSize = publishBatchSize;
       this.publishBatchBytes = publishBatchBytes;
     }
@@ -267,7 +271,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     @StartBundle
     public void startBundle(Context c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
-      pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+      pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute,
                                              c.getPipelineOptions().as(PubsubOptions.class));
     }
 
@@ -311,8 +315,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
             : topic.toString();
       builder.add(DisplayData.item("topic", topicString));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
-      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+      builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
+      builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
     }
   }
 
@@ -341,14 +345,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    * Pubsub message publish timestamp instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
    * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
    * a unique id ourselves.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
@@ -374,7 +378,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   private final Duration maxLatency;
 
   /**
-   * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+   * How record ids should be generated for each record (if {@link #idAttribute} is non-{@literal
    * null}).
    */
   private final RecordIdMethod recordIdMethod;
@@ -390,8 +394,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       PubsubClientFactory pubsubFactory,
       ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
+      String timestampAttribute,
+      String idAttribute,
       int numShards,
       int publishBatchSize,
       int publishBatchBytes,
@@ -401,25 +405,25 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     this.pubsubFactory = pubsubFactory;
     this.topic = topic;
     this.elementCoder = elementCoder;
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.numShards = numShards;
     this.publishBatchSize = publishBatchSize;
     this.publishBatchBytes = publishBatchBytes;
     this.maxLatency = maxLatency;
     this.formatFn = formatFn;
-    this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
+    this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod;
   }
 
   public PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
       ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
+      String timestampAttribute,
+      String idAttribute,
       SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
       int numShards) {
-    this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+    this(pubsubFactory, topic, elementCoder, timestampAttribute, idAttribute, numShards,
          DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
          formatFn, RecordIdMethod.RANDOM);
   }
@@ -439,19 +443,19 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   /**
-   * Get the timestamp label.
+   * Get the timestamp attribute.
    */
   @Nullable
-  public String getTimestampLabel() {
-    return timestampLabel;
+  public String getTimestampAttribute() {
+    return timestampAttribute;
   }
 
   /**
-   * Get the id label.
+   * Get the id attribute.
    */
   @Nullable
-  public String getIdLabel() {
-    return idLabel;
+  public String getIdAttribute() {
+    return idAttribute;
   }
 
   /**
@@ -483,7 +487,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
          .apply(GroupByKey.<Integer, OutgoingMessage>create())
          .apply("PubsubUnboundedSink.Writer",
-             ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+             ParDo.of(new WriterFn(pubsubFactory, topic, timestampAttribute, idAttribute,
                  publishBatchSize, publishBatchBytes)));
     return PDone.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 6392fd2..903ae41 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -602,7 +602,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       pubsubClient =
           new AtomicReference<>(
               outer.outer.pubsubFactory.newClient(
-                  outer.outer.timestampLabel, outer.outer.idLabel, options));
+                  outer.outer.timestampAttribute, outer.outer.idAttribute, options));
       ackTimeoutMs = -1;
       safeToAckIds = new HashSet<>();
       notYetRead = new ArrayDeque<>();
@@ -1207,22 +1207,22 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     @Nullable
     private final ValueProvider<TopicPath> topic;
     @Nullable
-    private final String timestampLabel;
+    private final String timestampAttribute;
     @Nullable
-    private final String idLabel;
+    private final String idAttribute;
 
     public StatsFn(
         PubsubClientFactory pubsubFactory,
         @Nullable ValueProvider<SubscriptionPath> subscription,
         @Nullable ValueProvider<TopicPath> topic,
-        @Nullable String timestampLabel,
-        @Nullable String idLabel) {
+        @Nullable String timestampAttribute,
+        @Nullable String idAttribute) {
       checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
       this.pubsubFactory = pubsubFactory;
       this.subscription = subscription;
       this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
+      this.timestampAttribute = timestampAttribute;
+      this.idAttribute = idAttribute;
     }
 
     @ProcessElement
@@ -1247,8 +1247,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         builder.add(DisplayData.item("topic", topicString));
       }
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
-      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+      builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
+      builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
     }
   }
 
@@ -1303,14 +1303,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
    * Pubsub message publish timestamp instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
    * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
    * a unique id ourselves.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
@@ -1327,8 +1327,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
     checkArgument((topic == null) != (subscription == null),
                   "Exactly one of topic and subscription must be given");
@@ -1340,8 +1340,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     this.topic = topic;
     this.subscription = subscription;
     this.elementCoder = checkNotNull(elementCoder);
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.parseFn = parseFn;
   }
 
@@ -1354,10 +1354,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
-    this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel,
+    this(
+        null,
+        pubsubFactory,
+        project,
+        topic,
+        subscription,
+        elementCoder,
+        timestampAttribute,
+        idAttribute,
         parseFn);
   }
 
@@ -1409,19 +1417,19 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   }
 
   /**
-   * Get the timestamp label.
+   * Get the timestamp attribute.
    */
   @Nullable
-  public String getTimestampLabel() {
-    return timestampLabel;
+  public String getTimestampAttribute() {
+    return timestampAttribute;
   }
 
   /**
-   * Get the id label.
+   * Get the id attribute.
    */
   @Nullable
-  public String getIdLabel() {
-    return idLabel;
+  public String getIdAttribute() {
+    return idAttribute;
   }
 
   /**
@@ -1438,13 +1446,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply("PubsubUnboundedSource.Stats",
                     ParDo.of(new StatsFn<T>(
-                        pubsubFactory, subscription, topic, timestampLabel, idLabel)));
+                        pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
   }
 
   private SubscriptionPath createRandomSubscription(PipelineOptions options) {
     try {
       try (PubsubClient pubsubClient =
-          pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) {
+          pubsubFactory.newClient(
+              timestampAttribute, idAttribute, options.as(PubsubOptions.class))) {
         checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
         checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
         SubscriptionPath subscriptionPath =

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index 14c36f9..d37235f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -45,8 +45,8 @@ public class PubsubClientTest {
   //
 
   private long parse(String timestamp) {
-    Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
-    return PubsubClient.extractTimestamp("myLabel", null, map);
+    Map<String, String> map = ImmutableMap.of("myAttribute", timestamp);
+    return PubsubClient.extractTimestamp("myAttribute", null, map);
   }
 
   private void roundTripRfc339(String timestamp) {
@@ -58,106 +58,106 @@ public class PubsubClientTest {
   }
 
   @Test
-  public void noTimestampLabelReturnsPubsubPublish() {
+  public void noTimestampAttributeReturnsPubsubPublish() {
     final long time = 987654321L;
     long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
     assertEquals(time, timestamp);
   }
 
   @Test
-  public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+  public void noTimestampAttributeAndInvalidPubsubPublishThrowsError() {
     thrown.expect(NumberFormatException.class);
     PubsubClient.extractTimestamp(null, "not-a-date", null);
   }
 
   @Test
-  public void timestampLabelWithNullAttributesThrowsError() {
+  public void timestampAttributeWithNullAttributesThrowsError() {
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
-    PubsubClient.extractTimestamp("myLabel", null, null);
+    thrown.expectMessage("PubSub message is missing a value for timestamp attribute myAttribute");
+    PubsubClient.extractTimestamp("myAttribute", null, null);
   }
 
   @Test
-  public void timestampLabelSetWithMissingAttributeThrowsError() {
+  public void timestampAttributeSetWithMissingAttributeThrowsError() {
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+    thrown.expectMessage("PubSub message is missing a value for timestamp attribute myAttribute");
     Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
-    PubsubClient.extractTimestamp("myLabel", null, map);
+    PubsubClient.extractTimestamp("myAttribute", null, map);
   }
 
   @Test
-  public void timestampLabelParsesMillisecondsSinceEpoch() {
+  public void timestampAttributeParsesMillisecondsSinceEpoch() {
     long time = 1446162101123L;
-    Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
-    long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+    Map<String, String> map = ImmutableMap.of("myAttribute", String.valueOf(time));
+    long timestamp = PubsubClient.extractTimestamp("myAttribute", null, map);
     assertEquals(time, timestamp);
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Seconds() {
+  public void timestampAttributeParsesRfc3339Seconds() {
     roundTripRfc339("2015-10-29T23:41:41Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Tenths() {
+  public void timestampAttributeParsesRfc3339Tenths() {
     roundTripRfc339("2015-10-29T23:41:41.1Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Hundredths() {
+  public void timestampAttributeParsesRfc3339Hundredths() {
     roundTripRfc339("2015-10-29T23:41:41.12Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Millis() {
+  public void timestampAttributeParsesRfc3339Millis() {
     roundTripRfc339("2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Micros() {
+  public void timestampAttributeParsesRfc3339Micros() {
     // Note: micros part 456/1000 is dropped.
     truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339MicrosRounding() {
+  public void timestampAttributeParsesRfc3339MicrosRounding() {
     // Note: micros part 999/1000 is dropped, not rounded up.
     truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormatThrowsError() {
+  public void timestampAttributeWithInvalidFormatThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("not-a-timestamp");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormat2ThrowsError() {
+  public void timestampAttributeWithInvalidFormat2ThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("null");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormat3ThrowsError() {
+  public void timestampAttributeWithInvalidFormat3ThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("2015-10");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339WithSmallYear() {
+  public void timestampAttributeParsesRfc3339WithSmallYear() {
     // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
     // This is therefore a "small year" until this difference is reconciled.
     roundTripRfc339("1582-10-15T01:23:45.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339WithLargeYear() {
+  public void timestampAttributeParsesRfc3339WithLargeYear() {
     // Year 9999 in range.
     roundTripRfc339("9999-10-29T23:41:41.123999Z");
   }
 
   @Test
-  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+  public void timestampAttributeRfc3339WithTooLargeYearThrowsError() {
     thrown.expect(NumberFormatException.class);
     // Year 10000 out of range.
     parse("10000-10-29T23:41:41.123999Z");

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 63721dc..87d6029 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -72,8 +72,8 @@ public class PubsubGrpcClientTest {
   private static final long REQ_TIME = 1234L;
   private static final long PUB_TIME = 3456L;
   private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
@@ -87,7 +87,9 @@ public class PubsubGrpcClientTest {
         PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt());
     inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build();
     testCredentials = new TestCredential();
-    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials);
+    client =
+        new PubsubGrpcClient(
+            TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, inProcessChannel, testCredentials);
   }
 
   @After
@@ -117,9 +119,9 @@ public class PubsubGrpcClientTest {
                      .setPublishTime(timestamp)
                      .putAllAttributes(ATTRIBUTES)
                      .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL,
+                         ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
                                          String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
+                             ID_ATTRIBUTE, RECORD_ID))
                      .build();
     ReceivedMessage expectedReceivedMessage =
         ReceivedMessage.newBuilder()
@@ -167,8 +169,8 @@ public class PubsubGrpcClientTest {
                      .setData(ByteString.copyFrom(DATA.getBytes()))
                      .putAllAttributes(ATTRIBUTES)
                      .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
+                         ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+                                         ID_ATTRIBUTE, RECORD_ID))
                      .build();
     final PublishRequest expectedRequest =
         PublishRequest.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 20039d4..5f06b88 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -93,14 +93,14 @@ public class PubsubIOTest {
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .fromTopic(StaticValueProvider.of(topic))
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test
@@ -110,14 +110,14 @@ public class PubsubIOTest {
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .fromSubscription(StaticValueProvider.of(subscription))
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("subscription", subscription));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test
@@ -168,14 +168,14 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     PubsubIO.Write<?> write = PubsubIO.<String>write()
         .to(topic)
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index d290994..578f814 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -58,8 +58,8 @@ public class PubsubJsonClientTest {
   private static final long REQ_TIME = 1234L;
   private static final long PUB_TIME = 3456L;
   private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
@@ -68,7 +68,7 @@ public class PubsubJsonClientTest {
   @Before
   public void setup() throws IOException {
     mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
-    client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+    client = new PubsubJsonClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, mockPubsub);
   }
 
   @After
@@ -88,8 +88,8 @@ public class PubsubJsonClientTest {
         .encodeData(DATA.getBytes())
         .setPublishTime(String.valueOf(PUB_TIME))
         .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
+            ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+                ID_ATTRIBUTE, RECORD_ID));
     ReceivedMessage expectedReceivedMessage =
         new ReceivedMessage().setMessage(expectedPubsubMessage)
                              .setAckId(ACK_ID);
@@ -117,8 +117,8 @@ public class PubsubJsonClientTest {
         .encodeData(DATA.getBytes())
         .setAttributes(
             ImmutableMap.<String, String> builder()
-                    .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
-                    .put(ID_LABEL, RECORD_ID)
+                    .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+                    .put(ID_ATTRIBUTE, RECORD_ID)
                     .put("k", "v").build());
     PublishRequest expectedRequest = new PublishRequest()
         .setMessages(ImmutableList.of(expectedPubsubMessage));

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index be425d4..580ada9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   private static final Map<String, String> ATTRIBUTES =
           ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
   private static final long TIMESTAMP = 1234L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final int NUM_SHARDS = 10;
 
   private static class Stamp extends DoFn<String, String> {
@@ -99,7 +99,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
                                                       ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, batchBytes,
               Duration.standardSeconds(2),
               new SimpleFunction<String, PubsubIO.PubsubMessage>() {
                 @Override
@@ -135,7 +135,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
                                                       ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, batchBytes,
               Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
@@ -170,7 +170,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
                                                       ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
-              StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+              StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE,
               NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
               null, RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 949ba4f..dc66ea1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -70,8 +70,8 @@ public class PubsubUnboundedSourceTest {
   private static final String DATA = "testData";
   private static final long TIMESTAMP = 1234L;
   private static final long REQ_TIME = 6373L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String ACK_ID = "testAckId";
   private static final String RECORD_ID = "testRecordId";
   private static final int ACK_TIMEOUT_S = 60;
@@ -96,7 +96,7 @@ public class PubsubUnboundedSourceTest {
     PubsubUnboundedSource<String> source =
         new PubsubUnboundedSource<>(
             clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
-            StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+            StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
     primSource = new PubsubSource<>(source);
   }
 


[5/9] beam git commit: Converts PubsubIO.Write to AutoValue

Posted by jk...@apache.org.
Converts PubsubIO.Write to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df6ef969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df6ef969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df6ef969

Branch: refs/heads/master
Commit: df6ef969d6df5c42d091cc00997b0ed7680315fb
Parents: 9e81548
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:34:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 166 +++++++------------
 1 file changed, 61 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/df6ef969/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 69a5bd6..5702af1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -461,8 +461,9 @@ public class PubsubIO {
     return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
+  /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
   public static <T> Write<T> write() {
-    return new Write<>();
+    return new AutoValue_PubsubIO_Write.Builder<T>().build();
   }
 
   /** Implementation of {@link #read}. */
@@ -696,43 +697,47 @@ public class PubsubIO {
   private PubsubIO() {}
 
 
-  /**
-   * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-   * to a Cloud Pub/Sub stream.
-   */
-  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
-
-    /** The Cloud Pub/Sub topic to publish to. */
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
-    private final ValueProvider<PubsubTopic> topic;
+    abstract ValueProvider<PubsubTopic> getTopicProvider();
+
     /** The name of the message attribute to publish message timestamps in. */
     @Nullable
-    private final String timestampLabel;
+    abstract String getTimestampLabel();
+
     /** The name of the message attribute to publish unique message IDs in. */
     @Nullable
-    private final String idLabel;
+    abstract String getIdLabel();
+
     /** The input type Coder. */
-    private final Coder<T> coder;
+    @Nullable
+    abstract Coder<T> getCoder();
+
     /** The format function for input PubsubMessage objects. */
-    SimpleFunction<T, PubsubMessage> formatFn;
+    @Nullable
+    abstract SimpleFunction<T, PubsubMessage> getFormatFn();
 
-    private Write() {
-      this(null, null, null, null, null, null);
-    }
+    abstract Builder<T> toBuilder();
 
-    private Write(
-        String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
-        String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
-      super(name);
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
-      this.coder = coder;
-      this.formatFn = formatFn;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
+
+      abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+      abstract Builder<T> setIdLabel(String idLabel);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);
+
+      abstract Write<T> build();
     }
 
     /**
-     * Creates a transform that publishes to the specified topic.
+     * Publishes to the specified topic.
      *
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
      * {@code topic} string.
@@ -745,14 +750,15 @@ public class PubsubIO {
      * Like {@code topic()} but with a {@link ValueProvider}.
      */
     public Write<T> topic(ValueProvider<String> topic) {
-      return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, idLabel, coder, formatFn);
+      return toBuilder()
+          .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+          .build();
     }
 
     /**
-     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
-     * messages in an attribute with the specified name. The value of the attribute will be a number
-     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+     * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
+     * with the specified name. The value of the attribute will be a number representing the number
+     * of milliseconds since the Unix epoch. For example, if using the Joda
      * time classes, {@link Instant#Instant(long)} can be used to parse this value.
      *
      * <p>If the output from this sink is being read by another Beam pipeline, then
@@ -760,32 +766,27 @@ public class PubsubIO {
      * these timestamps from the appropriate attribute.
      */
     public Write<T> timestampLabel(String timestampLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
     /**
-     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
-     * published messages in an attribute with the specified name. The value of the attribute is an
-     * opaque string.
+     * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
+     * attribute with the specified name. The value of the attribute is an opaque string.
      *
      * <p>If the the output from this sink is being read by another Beam pipeline, then
      * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
     public Write<T> idLabel(String idLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setIdLabel(idLabel).build();
     }
 
     /**
-     * Returns a new transform that's like this one
-     * but that uses the given {@link Coder} to encode each of
-     * the elements of the input {@link PCollection} into an
-     * output record.
-     *
-     * <p>Does not modify this object.
+     * Uses the given {@link Coder} to encode each of the elements of the input {@link PCollection}
+     * into an output record.
      */
     public Write<T> withCoder(Coder<T> coder) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setCoder(coder).build();
     }
 
     /**
@@ -794,12 +795,12 @@ public class PubsubIO {
      * to separately set the PubSub message's payload and attributes.
      */
     public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
-      return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+      return toBuilder().setFormatFn(formatFn).build();
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (topic == null) {
+      if (getTopicProvider() == null) {
         throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
       }
       switch (input.isBounded()) {
@@ -809,11 +810,11 @@ public class PubsubIO {
         case UNBOUNDED:
           return input.apply(new PubsubUnboundedSink<T>(
               FACTORY,
-              NestedValueProvider.of(topic, new TopicPathTranslator()),
-              coder,
-              timestampLabel,
-              idLabel,
-              formatFn,
+              NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
+              getCoder(),
+              getTimestampLabel(),
+              getIdLabel(),
+              getFormatFn(),
               100 /* numShards */));
       }
       throw new RuntimeException(); // cases are exhaustive.
@@ -822,7 +823,7 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
     }
 
     @Override
@@ -831,54 +832,6 @@ public class PubsubIO {
     }
 
     /**
-     * Returns the PubSub topic being written to.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return (topic == null) ? null : topic.get();
-    }
-
-    /**
-     * Returns the {@link ValueProvider} for the topic being written to.
-     */
-    @Nullable
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Returns the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Returns the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-    /**
-     * Returns the output coder.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Returns the formatting function used if publishing attributes.
-     */
-    @Nullable
-    public SimpleFunction<T, PubsubMessage> getFormatFn() {
-      return formatFn;
-    }
-
-    /**
      * Writer to Pubsub which batches messages from bounded collections.
      *
      * <p>Public so can be suppressed by runners.
@@ -894,7 +847,7 @@ public class PubsubIO {
         this.output = new ArrayList<>();
         // NOTE: idLabel is ignored.
         this.pubsubClient =
-            FACTORY.newClient(timestampLabel, null,
+            FACTORY.newClient(getTimestampLabel(), null,
                 c.getPipelineOptions().as(PubsubOptions.class));
       }
 
@@ -902,8 +855,8 @@ public class PubsubIO {
       public void processElement(ProcessContext c) throws IOException {
         byte[] payload = null;
         Map<String, String> attributes = null;
-        if (formatFn != null) {
-          PubsubMessage message = formatFn.apply(c.element());
+        if (getFormatFn() != null) {
+          PubsubMessage message = getFormatFn().apply(c.element());
           payload = message.getMessage();
           attributes = message.getAttributeMap();
         } else {
@@ -930,9 +883,12 @@ public class PubsubIO {
       }
 
       private void publish() throws IOException {
-        int n = pubsubClient.publish(
-            PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
-            output);
+        PubsubTopic topic = getTopicProvider().get();
+        int n =
+            pubsubClient.publish(
+                PubsubClient.topicPathFromName(
+                    topic.project, topic.topic),
+                output);
         checkState(n == output.size());
         output.clear();
       }


[3/9] beam git commit: Remove override of topic by subscription and vice versa

Posted by jk...@apache.org.
Remove override of topic by subscription and vice versa


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e815485
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e815485
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e815485

Branch: refs/heads/master
Commit: 9e815485b979b99b190c4acf1098ab054492ae9e
Parents: 5d8fbc4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 27 17:04:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    |  4 ----
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java    | 13 ++++++++-----
 2 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9e815485/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 20aed6d..69a5bd6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -534,8 +534,6 @@ public class PubsubIO {
       return toBuilder()
           .setSubscriptionProvider(
               NestedValueProvider.of(subscription, new SubscriptionTranslator()))
-          /* reset topic to null */
-          .setTopicProvider(null)
           .build();
     }
 
@@ -564,8 +562,6 @@ public class PubsubIO {
       }
       return toBuilder()
           .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
-          /* reset subscription to null */
-          .setSubscriptionProvider(null)
           .build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9e815485/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f44fffc..69d989f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -146,16 +146,19 @@ public class PubsubIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData;
-    PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+    PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
 
     // Reading from a subscription.
-    read = read.fromSubscription("projects/project/subscriptions/subscription");
+    PubsubIO.Read<String> read =
+        baseRead.fromSubscription("projects/project/subscriptions/subscription");
     displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("PubsubIO.Read should include the subscription in its primitive display data",
-        displayData, hasItem(hasDisplayItem("subscription")));
+    assertThat(
+        "PubsubIO.Read should include the subscription in its primitive display data",
+        displayData,
+        hasItem(hasDisplayItem("subscription")));
 
     // Reading from a topic.
-    read = read.fromTopic("projects/project/topics/topic");
+    read = baseRead.fromTopic("projects/project/topics/topic");
     displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the topic in its primitive display data",
         displayData, hasItem(hasDisplayItem("topic")));


[2/9] beam git commit: Adds PubsubIO.writeStrings(), writeProtos(), writeAvros()

Posted by jk...@apache.org.
Adds PubsubIO.writeStrings(), writeProtos(), writeAvros()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0651145
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0651145
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0651145

Branch: refs/heads/master
Commit: f0651145fea31854ab83fc064a3c7866251cc0a4
Parents: 079353d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:54:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f0651145/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 9604864..3a7522e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -474,8 +474,8 @@ public class PubsubIO {
   }
 
   /**
-   * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
-   * from a Google Cloud Pub/Sub stream.
+   * Returns A {@link PTransform} that continuously reads binary encoded protobuf messages of the
+   * given type from a Google Cloud Pub/Sub stream.
    */
   public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
     return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
@@ -494,6 +494,30 @@ public class PubsubIO {
     return new AutoValue_PubsubIO_Write.Builder<T>().build();
   }
 
+  /**
+   * Returns A {@link PTransform} that writes UTF-8 encoded strings to a Google Cloud Pub/Sub
+   * stream.
+   */
+  public static Write<String> writeStrings() {
+    return PubsubIO.<String>write().withCoder(StringUtf8Coder.of());
+  }
+
+  /**
+   * Returns A {@link PTransform} that writes binary encoded protobuf messages of a given type
+   * to a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Write<T> writeProtos(Class<T> messageClass) {
+    return PubsubIO.<T>write().withCoder(ProtoCoder.of(messageClass));
+  }
+
+  /**
+   * Returns A {@link PTransform} that writes binary encoded Avro messages of a given type
+   * to a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Write<T> writeAvros(Class<T> clazz) {
+    return PubsubIO.<T>write().withCoder(AvroCoder.of(clazz));
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {


[4/9] beam git commit: Converts PubsubIO.Read to AutoValue

Posted by jk...@apache.org.
Converts PubsubIO.Read to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4d04606
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4d04606
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4d04606

Branch: refs/heads/master
Commit: f4d04606c105ca45a7754516781cb72b4c818baf
Parents: f5e3f52
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:14:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 244 +++++++------------
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |   8 +-
 2 files changed, 95 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index f0926d4..3c76942 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -455,64 +456,61 @@ public class PubsubIO {
     }
   }
 
+   /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
   public static <T> Read<T> read() {
-    return new Read<>();
+    return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
   public static <T> Write<T> write() {
     return new Write<>();
   }
 
-  /**
-   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
-   * returns a {@link PCollection} of {@link String Strings} containing the items from
-   * the stream.
-   */
-  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
-    /** The Cloud Pub/Sub topic to read from. */
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable
-    private final ValueProvider<PubsubTopic> topic;
+    abstract ValueProvider<PubsubTopic> getTopicProvider();
 
-    /** The Cloud Pub/Sub subscription to read from. */
     @Nullable
-    private final ValueProvider<PubsubSubscription> subscription;
+    abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();
 
     /** The name of the message attribute to read timestamps from. */
     @Nullable
-    private final String timestampLabel;
+    abstract String getTimestampLabel();
 
     /** The name of the message attribute to read unique message IDs from. */
     @Nullable
-    private final String idLabel;
+    abstract String getIdLabel();
 
     /** The coder used to decode each record. */
     @Nullable
-    private final Coder<T> coder;
+    abstract Coder<T> getCoder();
 
     /** User function for parsing PubsubMessage object. */
-    SimpleFunction<PubsubMessage, T> parseFn;
+    @Nullable
+    abstract SimpleFunction<PubsubMessage, T> getParseFn();
 
-    private Read() {
-      this(null, null, null, null, null, null, null);
-    }
+    abstract Builder<T> toBuilder();
 
-    private Read(String name, ValueProvider<PubsubSubscription> subscription,
-        ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-        String idLabel,
-        SimpleFunction<PubsubMessage, T> parseFn) {
-      super(name);
-      this.subscription = subscription;
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.coder = coder;
-      this.idLabel = idLabel;
-      this.parseFn = parseFn;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
+
+      abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
+
+      abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+      abstract Builder<T> setIdLabel(String idLabel);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
+
+      abstract Read<T> build();
     }
 
     /**
-     * Returns a transform that's like this one but reading from the
-     * given subscription.
+     * Reads from the given subscription.
      *
      * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
      * of the {@code subscription} string.
@@ -520,8 +518,6 @@ public class PubsubIO {
      * <p>Multiple readers reading from the same subscription will each receive
      * some arbitrary portion of the data.  Most likely, separate readers should
      * use their own subscriptions.
-     *
-     * <p>Does not modify this object.
      */
     public Read<T> subscription(String subscription) {
       return subscription(StaticValueProvider.of(subscription));
@@ -535,9 +531,12 @@ public class PubsubIO {
         // Validate.
         PubsubSubscription.fromPath(subscription.get());
       }
-      return new Read<>(
-          name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-          null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+      return toBuilder()
+          .setSubscriptionProvider(
+              NestedValueProvider.of(subscription, new SubscriptionTranslator()))
+          /* reset topic to null */
+          .setTopicProvider(null)
+          .build();
     }
 
     /**
@@ -563,15 +562,16 @@ public class PubsubIO {
         // Validate.
         PubsubTopic.fromPath(topic.get());
       }
-      return new Read<>(name, null /* reset subscription to null */,
-          NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, coder, idLabel, parseFn);
+      return toBuilder()
+          .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+          /* reset subscription to null */
+          .setSubscriptionProvider(null)
+          .build();
     }
 
     /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
+     * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
+     * attributes, specifies the name of the attribute that contains the timestamp.
      *
      * <p>The timestamp value is expected to be represented in the attribute as either:
      *
@@ -599,88 +599,90 @@ public class PubsubIO {
      * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
      */
     public Read<T> timestampLabel(String timestampLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
     /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
+     * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
+     * message attributes, specifies the name of the attribute containing the unique identifier.
+     * The value of the attribute can be any string that uniquely identifies this record.
      *
      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
      * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
      * be delivered, and deduplication of the stream will be strictly best effort.
      */
     public Read<T> idLabel(String idLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setIdLabel(idLabel).build();
     }
 
     /**
-     * Returns a transform that's like this one but that uses the given
-     * {@link Coder} to decode each record into a value of type {@code T}.
-     *
-     * <p>Does not modify this object.
+     * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
      */
     public Read<T> withCoder(Coder<T> coder) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setCoder(coder).build();
     }
 
     /**
-     * Causes the source to return a PubsubMessage that includes Pubsub attributes.
-     * The user must supply a parsing function to transform the PubsubMessage into an output type.
+     * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
+     * given parsing function to transform the PubsubMessage into an output type.
      * A Coder for the output type T must be registered or set on the output via
      * {@link PCollection#setCoder(Coder)}.
      */
     public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
-      return new Read<T>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
+      return toBuilder().setParseFn(parseFn).build();
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (topic == null && subscription == null) {
-        throw new IllegalStateException("Need to set either the topic or the subscription for "
-            + "a PubsubIO.Read transform");
+      if (getTopicProvider() == null && getSubscriptionProvider() == null) {
+        throw new IllegalStateException(
+            "Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
       }
-      if (topic != null && subscription != null) {
-        throw new IllegalStateException("Can't set both the topic and the subscription for "
-            + "a PubsubIO.Read transform");
+      if (getTopicProvider() != null && getSubscriptionProvider() != null) {
+        throw new IllegalStateException(
+            "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
       }
-      if (coder == null) {
-        throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
-            + "the withCoder method.");
+      if (getCoder() == null) {
+        throw new IllegalStateException(
+            "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
       }
 
-      @Nullable ValueProvider<ProjectPath> projectPath =
-          topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
-      @Nullable ValueProvider<TopicPath> topicPath =
-          topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
-      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-          subscription == null
+      @Nullable
+      ValueProvider<ProjectPath> projectPath =
+          getTopicProvider() == null
               ? null
-              : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
-      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
-              FACTORY, projectPath, topicPath, subscriptionPath,
-              coder, timestampLabel, idLabel, parseFn);
+              : NestedValueProvider.of(getTopicProvider(), new ProjectPathTranslator());
+      @Nullable
+      ValueProvider<TopicPath> topicPath =
+          getTopicProvider() == null
+              ? null
+              : NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
+      @Nullable
+      ValueProvider<SubscriptionPath> subscriptionPath =
+          getSubscriptionProvider() == null
+              ? null
+              : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
+      PubsubUnboundedSource<T> source =
+          new PubsubUnboundedSource<T>(
+              FACTORY,
+              projectPath,
+              topicPath,
+              subscriptionPath,
+              getCoder(),
+              getTimestampLabel(),
+              getIdLabel(),
+              getParseFn());
       return input.getPipeline().apply(source);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
 
-      if (subscription != null) {
-        String subscriptionString = subscription.isAccessible()
-            ? subscription.get().asPath() : subscription.toString();
+      if (getSubscriptionProvider() != null) {
+        String subscriptionString = getSubscriptionProvider().isAccessible()
+            ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
         builder.add(DisplayData.item("subscription", subscriptionString)
             .withLabel("Pubsub Subscription"));
       }
@@ -688,72 +690,8 @@ public class PubsubIO {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return coder;
+      return getCoder();
     }
-
-    /**
-     * Get the topic being read from.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return topic == null ? null : topic.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the topic being read from.
-     */
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Get the subscription being read from.
-     */
-    @Nullable
-    public PubsubSubscription getSubscription() {
-      return subscription == null ? null : subscription.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the subscription being read from.
-     */
-    public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
-      return subscription;
-    }
-
-    /**
-     * Get the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Get the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-
-    /**
-     * Get the {@link Coder} used for the transform's output.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Get the parse function used for PubSub attributes.
-     */
-    @Nullable
-    public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
-      return parseFn;
-    }
-
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 6e9922c..7fe6e26 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -126,8 +126,8 @@ public class PubsubIOTest {
     String subscription = "projects/project/subscriptions/subscription";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .subscription(StaticValueProvider.of(subscription));
-    assertNull(read.getTopic());
-    assertNotNull(read.getSubscription());
+    assertNull(read.getTopicProvider());
+    assertNotNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
   }
 
@@ -136,8 +136,8 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic));
-    assertNotNull(read.getTopic());
-    assertNull(read.getSubscription());
+    assertNotNull(read.getTopicProvider());
+    assertNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
   }
 


[9/9] beam git commit: This closes #2750

Posted by jk...@apache.org.
This closes #2750


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14d60b26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14d60b26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14d60b26

Branch: refs/heads/master
Commit: 14d60b26e7e9b8a578038599341e66ccd99d012b
Parents: f5e3f52 8853d53
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 29 15:17:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 15:17:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |   6 +-
 .../examples/complete/game/LeaderBoard.java     |   6 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  18 +-
 .../org/apache/beam/sdk/util/PropertyNames.java |   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java    |  42 +-
 .../sdk/io/gcp/pubsub/PubsubGrpcClient.java     |  36 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 508 +++++++++----------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     |  36 +-
 .../sdk/io/gcp/pubsub/PubsubTestClient.java     |   6 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  58 ++-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  65 ++-
 .../sdk/io/gcp/pubsub/PubsubClientTest.java     |  50 +-
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java |  16 +-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |  78 +--
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java |  14 +-
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  |  10 +-
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   |   6 +-
 17 files changed, 459 insertions(+), 500 deletions(-)
----------------------------------------------------------------------



[8/9] beam git commit: Renames PubsubIO.Write builder methods to be style guide compliant

Posted by jk...@apache.org.
Renames PubsubIO.Write builder methods to be style guide compliant


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42c975ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42c975ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42c975ee

Branch: refs/heads/master
Commit: 42c975ee533a63be750da2e8de1925b42efd2cad
Parents: df6ef96
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:41:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java     | 12 ++++++------
 .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 10 +++++-----
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/42c975ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5702af1..99df3c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -742,14 +742,14 @@ public class PubsubIO {
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
      * {@code topic} string.
      */
-    public Write<T> topic(String topic) {
-      return topic(StaticValueProvider.of(topic));
+    public Write<T> to(String topic) {
+      return to(StaticValueProvider.of(topic));
     }
 
     /**
      * Like {@code topic()} but with a {@link ValueProvider}.
      */
-    public Write<T> topic(ValueProvider<String> topic) {
+    public Write<T> to(ValueProvider<String> topic) {
       return toBuilder()
           .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
           .build();
@@ -765,7 +765,7 @@ public class PubsubIO {
      * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
      * these timestamps from the appropriate attribute.
      */
-    public Write<T> timestampLabel(String timestampLabel) {
+    public Write<T> withTimestampLabel(String timestampLabel) {
       return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
@@ -777,7 +777,7 @@ public class PubsubIO {
      * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
-    public Write<T> idLabel(String idLabel) {
+    public Write<T> withIdLabel(String idLabel) {
       return toBuilder().setIdLabel(idLabel).build();
     }
 
@@ -794,7 +794,7 @@ public class PubsubIO {
      * function translates the input type T to a PubsubMessage object, which is used by the sink
      * to separately set the PubSub message's payload and attributes.
      */
-    public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+    public Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
       return toBuilder().setFormatFn(formatFn).build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/42c975ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 69d989f..f896bfc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -52,7 +52,7 @@ public class PubsubIOTest {
     assertEquals("PubsubIO.Read",
         PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
-        PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
+        PubsubIO.<String>write().to("projects/myproject/topics/mytopic").getName());
   }
 
   @Test
@@ -168,9 +168,9 @@ public class PubsubIOTest {
   public void testWriteDisplayData() {
     String topic = "projects/project/topics/topic";
     PubsubIO.Write<?> write = PubsubIO.<String>write()
-        .topic(topic)
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
+        .to(topic)
+        .withTimestampLabel("myTimestamp")
+        .withIdLabel("myId");
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -183,7 +183,7 @@ public class PubsubIOTest {
   @Category(ValidatesRunner.class)
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
+    PubsubIO.Write<?> write = PubsubIO.<String>write().to("projects/project/topics/topic");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("PubsubIO.Write should include the topic in its primitive display data",


[7/9] beam git commit: Renames PubsubIO.Read builder methods to be style guide compliant

Posted by jk...@apache.org.
Renames PubsubIO.Read builder methods to be style guide compliant


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d8fbc4c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d8fbc4c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d8fbc4c

Branch: refs/heads/master
Commit: 5d8fbc4c4d87f75ea84a40c2ee36531eb0eda26f
Parents: f4d0460
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:19:37 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  2 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 24 ++++++-------
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    | 38 ++++++++++----------
 4 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index e0048b7..d95eb06 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -253,7 +253,7 @@ public class GameStats extends LeaderBoard {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
         .apply(PubsubIO.<String>read()
-            .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
             .withCoder(StringUtf8Coder.of()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 96f4291..a87468a 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -192,7 +192,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
         .apply(PubsubIO.<String>read()
-            .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
             .withCoder(StringUtf8Coder.of()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3c76942..20aed6d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -519,14 +519,14 @@ public class PubsubIO {
      * some arbitrary portion of the data.  Most likely, separate readers should
      * use their own subscriptions.
      */
-    public Read<T> subscription(String subscription) {
-      return subscription(StaticValueProvider.of(subscription));
+    public Read<T> fromSubscription(String subscription) {
+      return fromSubscription(StaticValueProvider.of(subscription));
     }
 
     /**
      * Like {@code subscription()} but with a {@link ValueProvider}.
      */
-    public Read<T> subscription(ValueProvider<String> subscription) {
+    public Read<T> fromSubscription(ValueProvider<String> subscription) {
       if (subscription.isAccessible()) {
         // Validate.
         PubsubSubscription.fromPath(subscription.get());
@@ -541,7 +541,7 @@ public class PubsubIO {
 
     /**
      * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-     * with {@link #subscription(String)}.
+     * with {@link #fromSubscription(String)}.
      *
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
      * of the {@code topic} string.
@@ -550,14 +550,14 @@ public class PubsubIO {
      * pipeline is started. Any data published on the topic before the pipeline is started will
      * not be read by the runner.
      */
-    public Read<T> topic(String topic) {
-      return topic(StaticValueProvider.of(topic));
+    public Read<T> fromTopic(String topic) {
+      return fromTopic(StaticValueProvider.of(topic));
     }
 
     /**
      * Like {@code topic()} but with a {@link ValueProvider}.
      */
-    public Read<T> topic(ValueProvider<String> topic) {
+    public Read<T> fromTopic(ValueProvider<String> topic) {
       if (topic.isAccessible()) {
         // Validate.
         PubsubTopic.fromPath(topic.get());
@@ -598,7 +598,7 @@ public class PubsubIO {
      *
      * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
      */
-    public Read<T> timestampLabel(String timestampLabel) {
+    public Read<T> withTimestampLabel(String timestampLabel) {
       return toBuilder().setTimestampLabel(timestampLabel).build();
     }
 
@@ -611,7 +611,7 @@ public class PubsubIO {
      * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
      * be delivered, and deduplication of the stream will be strictly best effort.
      */
-    public Read<T> idLabel(String idLabel) {
+    public Read<T> withIdLabel(String idLabel) {
       return toBuilder().setIdLabel(idLabel).build();
     }
 
@@ -628,7 +628,7 @@ public class PubsubIO {
      * A Coder for the output type T must be registered or set on the output via
      * {@link PCollection#setCoder(Coder)}.
      */
-    public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+    public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) {
       return toBuilder().setParseFn(parseFn).build();
     }
 
@@ -760,7 +760,7 @@ public class PubsubIO {
      * time classes, {@link Instant#Instant(long)} can be used to parse this value.
      *
      * <p>If the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
+     * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
      * these timestamps from the appropriate attribute.
      */
     public Write<T> timestampLabel(String timestampLabel) {
@@ -773,7 +773,7 @@ public class PubsubIO {
      * opaque string.
      *
      * <p>If the the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
+     * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
     public Write<T> idLabel(String idLabel) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 7fe6e26..f44fffc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -50,19 +50,19 @@ public class PubsubIOTest {
   @Test
   public void testPubsubIOGetName() {
     assertEquals("PubsubIO.Read",
-        PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
+        PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
         PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
   }
 
   @Test
   public void testTopicValidationSuccess() throws Exception {
-    PubsubIO.<String>read().topic("projects/my-project/topics/abc");
-    PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
-    PubsubIO.<String>read().topic(new StringBuilder()
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc");
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC");
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF");
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234");
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+    PubsubIO.<String>read().fromTopic(new StringBuilder()
         .append("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -73,13 +73,13 @@ public class PubsubIOTest {
   @Test
   public void testTopicValidationBadCharacter() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
+    PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc");
   }
 
   @Test
   public void testTopicValidationTooLong() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().topic(new StringBuilder().append
+    PubsubIO.<String>read().fromTopic(new StringBuilder().append
         ("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -93,9 +93,9 @@ public class PubsubIOTest {
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .topic(StaticValueProvider.of(topic))
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
+        .fromTopic(StaticValueProvider.of(topic))
+        .withTimestampLabel("myTimestamp")
+        .withIdLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
@@ -110,9 +110,9 @@ public class PubsubIOTest {
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .subscription(StaticValueProvider.of(subscription))
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
+        .fromSubscription(StaticValueProvider.of(subscription))
+        .withTimestampLabel("myTimestamp")
+        .withIdLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
@@ -125,7 +125,7 @@ public class PubsubIOTest {
   public void testNullTopic() {
     String subscription = "projects/project/subscriptions/subscription";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .subscription(StaticValueProvider.of(subscription));
+        .fromSubscription(StaticValueProvider.of(subscription));
     assertNull(read.getTopicProvider());
     assertNotNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
@@ -135,7 +135,7 @@ public class PubsubIOTest {
   public void testNullSubscription() {
     String topic = "projects/project/topics/topic";
     PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .topic(StaticValueProvider.of(topic));
+        .fromTopic(StaticValueProvider.of(topic));
     assertNotNull(read.getTopicProvider());
     assertNull(read.getSubscriptionProvider());
     assertNotNull(DisplayData.from(read));
@@ -149,13 +149,13 @@ public class PubsubIOTest {
     PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
 
     // Reading from a subscription.
-    read = read.subscription("projects/project/subscriptions/subscription");
+    read = read.fromSubscription("projects/project/subscriptions/subscription");
     displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the subscription in its primitive display data",
         displayData, hasItem(hasDisplayItem("subscription")));
 
     // Reading from a topic.
-    read = read.topic("projects/project/topics/topic");
+    read = read.fromTopic("projects/project/topics/topic");
     displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the topic in its primitive display data",
         displayData, hasItem(hasDisplayItem("topic")));


[6/9] beam git commit: Adds PubsubIO.readStrings(), readProtos(), readAvros()

Posted by jk...@apache.org.
Adds PubsubIO.readStrings(), readProtos(), readAvros()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079353d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079353d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079353d5

Branch: refs/heads/master
Commit: 079353d58c65141683e4640e425ee610001e7718
Parents: 42c975e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:50:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  6 ++---
 .../examples/complete/game/LeaderBoard.java     |  6 ++---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++++
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  4 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |  3 +--
 5 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d95eb06..d628497 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,7 +24,6 @@ import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -252,9 +251,8 @@ public class GameStats extends LeaderBoard {
 
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
-        .apply(PubsubIO.<String>read()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
-            .withCoder(StringUtf8Coder.of()))
+        .apply(PubsubIO.readStrings()
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a87468a..fbffac6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,7 +27,6 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -191,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
     // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
-        .apply(PubsubIO.<String>read()
-            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
-            .withCoder(StringUtf8Coder.of()))
+        .apply(PubsubIO.readStrings()
+            .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     gameEvents.apply("CalculateTeamScores",

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 99df3c6..9604864 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,8 +30,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
@@ -461,6 +465,30 @@ public class PubsubIO {
     return new AutoValue_PubsubIO_Read.Builder<T>().build();
   }
 
+  /**
+   * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud
+   * Pub/Sub stream.
+   */
+  public static Read<String> readStrings() {
+    return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
+   * from a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
+    return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+  }
+
+  /**
+   * Returns A {@link PTransform} that continuously reads binary encoded Avro messages of the
+   * given type from a Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
+    return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+  }
+
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
   public static <T> Write<T> write() {
     return new AutoValue_PubsubIO_Write.Builder<T>().build();

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b16b665..6392fd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -82,7 +82,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A PTransform which streams messages from Pubsub.
+ * Users should use {@link PubsubIO#read} instead.
+ *
+ * <p>A PTransform which streams messages from Pubsub.
  * <ul>
  * <li>The underlying implementation in an {@link UnboundedSource} which receives messages
  * in batches and hands them out one at a time.

http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f896bfc..20039d4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -146,7 +145,7 @@ public class PubsubIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData;
-    PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+    PubsubIO.Read<String> baseRead = PubsubIO.readStrings();
 
     // Reading from a subscription.
     PubsubIO.Read<String> read =