You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:01 UTC

[30/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
new file mode 100644
index 0000000..8fc1c19
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
+ * and consume unbounded {@link PCollection PCollections}.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Beam pipeline. Please refer to the documentation of corresponding
+ * {@link PipelineRunner PipelineRunners} for more details.
+ */
+public class PubsubIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
+
+  /** Factory for creating pubsub client to manage transport. */
+  private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
+
+  /**
+   * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
+   * IDs must start with a letter and may not end with a dash.
+   * This regex isn't exact - this allows for patterns that would be rejected by
+   * the service, but this is sufficient for basic parsing of table references.
+   */
+  private static final Pattern PROJECT_ID_REGEXP =
+      Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
+
+  private static final Pattern SUBSCRIPTION_REGEXP =
+      Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
+
+  private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
+
+  private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
+      Pattern.compile("/subscriptions/([^/]+)/(.+)");
+
+  private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
+
+  private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
+
+  private static final int PUBSUB_NAME_MIN_LENGTH = 3;
+  private static final int PUBSUB_NAME_MAX_LENGTH = 255;
+
+  private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
+  private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
+  private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
+
+  private static void validateProjectName(String project) {
+    Matcher match = PROJECT_ID_REGEXP.matcher(project);
+    if (!match.matches()) {
+      throw new IllegalArgumentException(
+          "Illegal project name specified in Pubsub subscription: " + project);
+    }
+  }
+
+  private static void validatePubsubName(String name) {
+    if (name.length() < PUBSUB_NAME_MIN_LENGTH) {
+      throw new IllegalArgumentException(
+          "Pubsub object name is shorter than 3 characters: " + name);
+    }
+    if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
+      throw new IllegalArgumentException(
+          "Pubsub object name is longer than 255 characters: " + name);
+    }
+
+    if (name.startsWith("goog")) {
+      throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
+    }
+
+    Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
+    if (!match.matches()) {
+      throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
+          + " Please see Javadoc for naming rules.");
+    }
+  }
+
+  /**
+   * Populate common {@link DisplayData} between Pubsub source and sink.
+   */
+  private static void populateCommonDisplayData(DisplayData.Builder builder,
+      String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
+    builder
+        .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
+            .withLabel("Timestamp Label Attribute"))
+        .addIfNotNull(DisplayData.item("idLabel", idLabel)
+            .withLabel("ID Label Attribute"));
+
+    if (topic != null) {
+      String topicString = topic.isAccessible() ? topic.get().asPath()
+          : topic.toString();
+      builder.add(DisplayData.item("topic", topicString)
+          .withLabel("Pubsub Topic"));
+    }
+  }
+
+  /**
+   * Class representing a Pub/Sub message. Each message contains a single message payload and
+   * a map of attached attributes.
+   */
+  public static class PubsubMessage {
+
+    private byte[] message;
+    private Map<String, String> attributes;
+
+    public PubsubMessage(byte[] message, Map<String, String> attributes) {
+      this.message = message;
+      this.attributes = attributes;
+    }
+
+    /**
+     * Returns the main PubSub message.
+     */
+    public byte[] getMessage() {
+      return message;
+    }
+
+    /**
+     * Returns the given attribute value. If not such attribute exists, returns null.
+     */
+    @Nullable
+    public String getAttribute(String attribute) {
+      checkNotNull(attribute, "attribute");
+      return attributes.get(attribute);
+    }
+
+    /**
+     * Returns the full map of attributes. This is an unmodifiable map.
+     */
+    public Map<String, String> getAttributeMap() {
+      return attributes;
+    }
+  }
+
+  /**
+   * Class representing a Cloud Pub/Sub Subscription.
+   */
+  public static class PubsubSubscription implements Serializable {
+
+    private enum Type {NORMAL, FAKE}
+
+    private final Type type;
+    private final String project;
+    private final String subscription;
+
+    private PubsubSubscription(Type type, String project, String subscription) {
+      this.type = type;
+      this.project = project;
+      this.subscription = subscription;
+    }
+
+    /**
+     * Creates a class representing a Pub/Sub subscription from the specified subscription path.
+     *
+     * <p>Cloud Pub/Sub subscription names should be of the form
+     * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
+     * of the project the subscription belongs to. The {@code <subscription>} component must comply
+     * with the following requirements:
+     *
+     * <ul>
+     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
+     * ('.').</li>
+     * <li>Must be between 3 and 255 characters.</li>
+     * <li>Must begin with a letter.</li>
+     * <li>Must end with a letter or a number.</li>
+     * <li>Cannot begin with {@code 'goog'} prefix.</li>
+     * </ul>
+     */
+    public static PubsubSubscription fromPath(String path) {
+      if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
+          || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
+        return new PubsubSubscription(Type.FAKE, "", path);
+      }
+
+      String projectName, subscriptionName;
+
+      Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
+      if (v1beta1Match.matches()) {
+        LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
+            + "projects/<project_id>/subscriptions/<subscription_name>");
+        projectName = v1beta1Match.group(1);
+        subscriptionName = v1beta1Match.group(2);
+      } else {
+        Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
+        if (!match.matches()) {
+          throw new IllegalArgumentException("Pubsub subscription is not in "
+              + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
+        }
+        projectName = match.group(1);
+        subscriptionName = match.group(2);
+      }
+
+      validateProjectName(projectName);
+      validatePubsubName(subscriptionName);
+      return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
+    }
+
+    /**
+     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+     * v1beta1 API.
+     *
+     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
+     */
+    @Deprecated
+    public String asV1Beta1Path() {
+      if (type == Type.NORMAL) {
+        return "/subscriptions/" + project + "/" + subscription;
+      } else {
+        return subscription;
+      }
+    }
+
+    /**
+     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+     * v1beta2 API.
+     *
+     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
+     */
+    @Deprecated
+    public String asV1Beta2Path() {
+      if (type == Type.NORMAL) {
+        return "projects/" + project + "/subscriptions/" + subscription;
+      } else {
+        return subscription;
+      }
+    }
+
+    /**
+     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+     * API.
+     */
+    public String asPath() {
+      if (type == Type.NORMAL) {
+        return "projects/" + project + "/subscriptions/" + subscription;
+      } else {
+        return subscription;
+      }
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link PubsubSubscription}.
+   */
+  private static class SubscriptionTranslator
+      implements SerializableFunction<String, PubsubSubscription> {
+
+    @Override
+    public PubsubSubscription apply(String from) {
+      return PubsubSubscription.fromPath(from);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link SubscriptionPath}.
+   */
+  private static class SubscriptionPathTranslator
+      implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
+
+    @Override
+    public SubscriptionPath apply(PubsubSubscription from) {
+      return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link PubsubTopic}.
+   */
+  private static class TopicTranslator
+      implements SerializableFunction<String, PubsubTopic> {
+
+    @Override
+    public PubsubTopic apply(String from) {
+      return PubsubTopic.fromPath(from);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link TopicPath}.
+   */
+  private static class TopicPathTranslator
+      implements SerializableFunction<PubsubTopic, TopicPath> {
+
+    @Override
+    public TopicPath apply(PubsubTopic from) {
+      return PubsubClient.topicPathFromName(from.project, from.topic);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link ProjectPath}.
+   */
+  private static class ProjectPathTranslator
+      implements SerializableFunction<PubsubTopic, ProjectPath> {
+
+    @Override
+    public ProjectPath apply(PubsubTopic from) {
+      return PubsubClient.projectPathFromId(from.project);
+    }
+  }
+
+  /**
+   * Class representing a Cloud Pub/Sub Topic.
+   */
+  public static class PubsubTopic implements Serializable {
+
+    private enum Type {NORMAL, FAKE}
+
+    private final Type type;
+    private final String project;
+    private final String topic;
+
+    private PubsubTopic(Type type, String project, String topic) {
+      this.type = type;
+      this.project = project;
+      this.topic = topic;
+    }
+
+    /**
+     * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
+     *
+     * <p>Cloud Pub/Sub topic names should be of the form
+     * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
+     * the publishing project. The {@code <topic>} component must comply with
+     * the following requirements:
+     *
+     * <ul>
+     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
+     * ('.').</li>
+     * <li>Must be between 3 and 255 characters.</li>
+     * <li>Must begin with a letter.</li>
+     * <li>Must end with a letter or a number.</li>
+     * <li>Cannot begin with 'goog' prefix.</li>
+     * </ul>
+     */
+    public static PubsubTopic fromPath(String path) {
+      if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
+        return new PubsubTopic(Type.FAKE, "", path);
+      }
+
+      String projectName, topicName;
+
+      Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
+      if (v1beta1Match.matches()) {
+        LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format "
+            + "projects/<project_id>/topics/<topic_name>");
+        projectName = v1beta1Match.group(1);
+        topicName = v1beta1Match.group(2);
+      } else {
+        Matcher match = TOPIC_REGEXP.matcher(path);
+        if (!match.matches()) {
+          throw new IllegalArgumentException(
+              "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
+        }
+        projectName = match.group(1);
+        topicName = match.group(2);
+      }
+
+      validateProjectName(projectName);
+      validatePubsubName(topicName);
+      return new PubsubTopic(Type.NORMAL, projectName, topicName);
+    }
+
+    /**
+     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+     * v1beta1 API.
+     *
+     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
+     */
+    @Deprecated
+    public String asV1Beta1Path() {
+      if (type == Type.NORMAL) {
+        return "/topics/" + project + "/" + topic;
+      } else {
+        return topic;
+      }
+    }
+
+    /**
+     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+     * v1beta2 API.
+     *
+     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
+     */
+    @Deprecated
+    public String asV1Beta2Path() {
+      if (type == Type.NORMAL) {
+        return "projects/" + project + "/topics/" + topic;
+      } else {
+        return topic;
+      }
+    }
+
+    /**
+     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+     * API.
+     */
+    public String asPath() {
+      if (type == Type.NORMAL) {
+        return "projects/" + project + "/topics/" + topic;
+      } else {
+        return topic;
+      }
+    }
+  }
+
+  public static <T> Read<T> read() {
+    return new Read<>();
+  }
+
+  public static <T> Write<T> write() {
+    return new Write<>();
+  }
+
+  /**
+   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
+   * returns a {@link PCollection} of {@link String Strings} containing the items from
+   * the stream.
+   */
+  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    /** The Cloud Pub/Sub topic to read from. */
+    @Nullable
+    private final ValueProvider<PubsubTopic> topic;
+
+    /** The Cloud Pub/Sub subscription to read from. */
+    @Nullable
+    private final ValueProvider<PubsubSubscription> subscription;
+
+    /** The name of the message attribute to read timestamps from. */
+    @Nullable
+    private final String timestampLabel;
+
+    /** The name of the message attribute to read unique message IDs from. */
+    @Nullable
+    private final String idLabel;
+
+    /** The coder used to decode each record. */
+    @Nullable
+    private final Coder<T> coder;
+
+    /** User function for parsing PubsubMessage object. */
+    SimpleFunction<PubsubMessage, T> parseFn;
+
+    private Read() {
+      this(null, null, null, null, null, null, null);
+    }
+
+    private Read(String name, ValueProvider<PubsubSubscription> subscription,
+        ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
+        String idLabel,
+        SimpleFunction<PubsubMessage, T> parseFn) {
+      super(name);
+      this.subscription = subscription;
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.coder = coder;
+      this.idLabel = idLabel;
+      this.parseFn = parseFn;
+    }
+
+    /**
+     * Returns a transform that's like this one but reading from the
+     * given subscription.
+     *
+     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
+     * of the {@code subscription} string.
+     *
+     * <p>Multiple readers reading from the same subscription will each receive
+     * some arbitrary portion of the data.  Most likely, separate readers should
+     * use their own subscriptions.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read<T> subscription(String subscription) {
+      return subscription(StaticValueProvider.of(subscription));
+    }
+
+    /**
+     * Like {@code subscription()} but with a {@link ValueProvider}.
+     */
+    public Read<T> subscription(ValueProvider<String> subscription) {
+      if (subscription.isAccessible()) {
+        // Validate.
+        PubsubSubscription.fromPath(subscription.get());
+      }
+      return new Read<>(
+          name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+          null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+    }
+
+    /**
+     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+     * with {@link #subscription(String)}.
+     *
+     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+     * of the {@code topic} string.
+     *
+     * <p>The Beam runner will start reading data published on this topic from the time the
+     * pipeline is started. Any data published on the topic before the pipeline is started will
+     * not be read by the runner.
+     */
+    public Read<T> topic(String topic) {
+      return topic(StaticValueProvider.of(topic));
+    }
+
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public Read<T> topic(ValueProvider<String> topic) {
+      if (topic.isAccessible()) {
+        // Validate.
+        PubsubTopic.fromPath(topic.get());
+      }
+      return new Read<>(name, null /* reset subscription to null */,
+          NestedValueProvider.of(topic, new TopicTranslator()),
+          timestampLabel, coder, idLabel, parseFn);
+    }
+
+    /**
+     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+     * parameter specifies the name of the attribute that contains the timestamp.
+     *
+     * <p>The timestamp value is expected to be represented in the attribute as either:
+     *
+     * <ul>
+     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+     * value for this attribute.
+     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+     * sub-second component of the timestamp is optional, and digits beyond the first three
+     * (i.e., time units smaller than milliseconds) will be ignored.
+     * </ul>
+     *
+     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+     * the first time it sees each record. All windowing will be done relative to these
+     * timestamps.
+     *
+     * <p>By default, windows are emitted based on an estimate of when this source is likely
+     * done producing data for a given timestamp (referred to as the Watermark; see
+     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+     * specified with the windowing strategy &ndash; by default it will be output immediately.
+     *
+     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+     *
+     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
+     */
+    public Read<T> timestampLabel(String timestampLabel) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel,
+          parseFn);
+    }
+
+    /**
+     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+     * parameter specifies the attribute name. The value of the attribute can be any string
+     * that uniquely identifies this record.
+     *
+     * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+     * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+     * be delivered, and deduplication of the stream will be strictly best effort.
+     */
+    public Read<T> idLabel(String idLabel) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel,
+          parseFn);
+    }
+
+    /**
+     * Returns a transform that's like this one but that uses the given
+     * {@link Coder} to decode each record into a value of type {@code T}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      return new Read<>(
+          name, subscription, topic, timestampLabel, coder, idLabel,
+          parseFn);
+    }
+
+    /**
+     * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+     * The user must supply a parsing function to transform the PubsubMessage into an output type.
+     * A Coder for the output type T must be registered or set on the output via
+     * {@link PCollection#setCoder(Coder)}.
+     */
+    public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+      return new Read<T>(
+          name, subscription, topic, timestampLabel, coder, idLabel,
+          parseFn);
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (topic == null && subscription == null) {
+        throw new IllegalStateException("Need to set either the topic or the subscription for "
+            + "a PubsubIO.Read transform");
+      }
+      if (topic != null && subscription != null) {
+        throw new IllegalStateException("Can't set both the topic and the subscription for "
+            + "a PubsubIO.Read transform");
+      }
+      if (coder == null) {
+        throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+            + "the withCoder method.");
+      }
+
+      @Nullable ValueProvider<ProjectPath> projectPath =
+          topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+      @Nullable ValueProvider<TopicPath> topicPath =
+          topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+          subscription == null
+              ? null
+              : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
+      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
+              FACTORY, projectPath, topicPath, subscriptionPath,
+              coder, timestampLabel, idLabel, parseFn);
+      return input.getPipeline().apply(source);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+
+      if (subscription != null) {
+        String subscriptionString = subscription.isAccessible()
+            ? subscription.get().asPath() : subscription.toString();
+        builder.add(DisplayData.item("subscription", subscriptionString)
+            .withLabel("Pubsub Subscription"));
+      }
+    }
+
+    @Override
+    protected Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    /**
+     * Get the topic being read from.
+     */
+    @Nullable
+    public PubsubTopic getTopic() {
+      return topic == null ? null : topic.get();
+    }
+
+    /**
+     * Get the {@link ValueProvider} for the topic being read from.
+     */
+    public ValueProvider<PubsubTopic> getTopicProvider() {
+      return topic;
+    }
+
+    /**
+     * Get the subscription being read from.
+     */
+    @Nullable
+    public PubsubSubscription getSubscription() {
+      return subscription == null ? null : subscription.get();
+    }
+
+    /**
+     * Get the {@link ValueProvider} for the subscription being read from.
+     */
+    public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
+      return subscription;
+    }
+
+    /**
+     * Get the timestamp label.
+     */
+    @Nullable
+    public String getTimestampLabel() {
+      return timestampLabel;
+    }
+
+    /**
+     * Get the id label.
+     */
+    @Nullable
+    public String getIdLabel() {
+      return idLabel;
+    }
+
+
+    /**
+     * Get the {@link Coder} used for the transform's output.
+     */
+    @Nullable
+    public Coder<T> getCoder() {
+      return coder;
+    }
+
+    /**
+     * Get the parse function used for PubSub attributes.
+     */
+    @Nullable
+    public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+      return parseFn;
+    }
+
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Disallow construction of utility class. */
+  private PubsubIO() {}
+
+
+  /**
+   * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
+   * to a Cloud Pub/Sub stream.
+   */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
+    /** The Cloud Pub/Sub topic to publish to. */
+    @Nullable
+    private final ValueProvider<PubsubTopic> topic;
+    /** The name of the message attribute to publish message timestamps in. */
+    @Nullable
+    private final String timestampLabel;
+    /** The name of the message attribute to publish unique message IDs in. */
+    @Nullable
+    private final String idLabel;
+    /** The input type Coder. */
+    private final Coder<T> coder;
+    /** The format function for input PubsubMessage objects. */
+    SimpleFunction<T, PubsubMessage> formatFn;
+
+    private Write() {
+      this(null, null, null, null, null, null);
+    }
+
+    private Write(
+        String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
+        String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
+      super(name);
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.idLabel = idLabel;
+      this.coder = coder;
+      this.formatFn = formatFn;
+    }
+
+    /**
+     * Creates a transform that publishes to the specified topic.
+     *
+     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
+     * {@code topic} string.
+     */
+    public Write<T> topic(String topic) {
+      return topic(StaticValueProvider.of(topic));
+    }
+
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public Write<T> topic(ValueProvider<String> topic) {
+      return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
+          timestampLabel, idLabel, coder, formatFn);
+    }
+
+    /**
+     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
+     * messages in an attribute with the specified name. The value of the attribute will be a number
+     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+     * time classes, {@link Instant#Instant(long)} can be used to parse this value.
+     *
+     * <p>If the output from this sink is being read by another Beam pipeline, then
+     * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
+     * these timestamps from the appropriate attribute.
+     */
+    public Write<T> timestampLabel(String timestampLabel) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+    }
+
+    /**
+     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
+     * published messages in an attribute with the specified name. The value of the attribute is an
+     * opaque string.
+     *
+     * <p>If the the output from this sink is being read by another Beam pipeline, then
+     * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
+     * these unique identifiers from the appropriate attribute.
+     */
+    public Write<T> idLabel(String idLabel) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+    }
+
+    /**
+     * Returns a new transform that's like this one
+     * but that uses the given {@link Coder} to encode each of
+     * the elements of the input {@link PCollection} into an
+     * output record.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write<T> withCoder(Coder<T> coder) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+    }
+
+    /**
+     * Used to write a PubSub message together with PubSub attributes. The user-supplied format
+     * function translates the input type T to a PubsubMessage object, which is used by the sink
+     * to separately set the PubSub message's payload and attributes.
+     */
+    public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+      return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (topic == null) {
+        throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
+      }
+      switch (input.isBounded()) {
+        case BOUNDED:
+          input.apply(ParDo.of(new PubsubBoundedWriter()));
+          return PDone.in(input.getPipeline());
+        case UNBOUNDED:
+          return input.apply(new PubsubUnboundedSink<T>(
+              FACTORY,
+              NestedValueProvider.of(topic, new TopicPathTranslator()),
+              coder,
+              timestampLabel,
+              idLabel,
+              formatFn,
+              100 /* numShards */));
+      }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+    }
+
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+
+    /**
+     * Returns the PubSub topic being written to.
+     */
+    @Nullable
+    public PubsubTopic getTopic() {
+      return (topic == null) ? null : topic.get();
+    }
+
+    /**
+     * Returns the {@link ValueProvider} for the topic being written to.
+     */
+    @Nullable
+    public ValueProvider<PubsubTopic> getTopicProvider() {
+      return topic;
+    }
+
+    /**
+     * Returns the timestamp label.
+     */
+    @Nullable
+    public String getTimestampLabel() {
+      return timestampLabel;
+    }
+
+    /**
+     * Returns the id label.
+     */
+    @Nullable
+    public String getIdLabel() {
+      return idLabel;
+    }
+
+    /**
+     * Returns the output coder.
+     */
+    @Nullable
+    public Coder<T> getCoder() {
+      return coder;
+    }
+
+    /**
+     * Returns the formatting function used if publishing attributes.
+     */
+    @Nullable
+    public SimpleFunction<T, PubsubMessage> getFormatFn() {
+      return formatFn;
+    }
+
+    /**
+     * Writer to Pubsub which batches messages from bounded collections.
+     *
+     * <p>Public so can be suppressed by runners.
+     */
+    public class PubsubBoundedWriter extends DoFn<T, Void> {
+
+      private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+      private transient List<OutgoingMessage> output;
+      private transient PubsubClient pubsubClient;
+
+      @StartBundle
+      public void startBundle(Context c) throws IOException {
+        this.output = new ArrayList<>();
+        // NOTE: idLabel is ignored.
+        this.pubsubClient =
+            FACTORY.newClient(timestampLabel, null,
+                c.getPipelineOptions().as(PubsubOptions.class));
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        byte[] payload = null;
+        Map<String, String> attributes = null;
+        if (formatFn != null) {
+          PubsubMessage message = formatFn.apply(c.element());
+          payload = message.getMessage();
+          attributes = message.getAttributeMap();
+        } else {
+          payload = CoderUtils.encodeToByteArray(getCoder(), c.element());
+        }
+        // NOTE: The record id is always null.
+        OutgoingMessage message =
+            new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null);
+        output.add(message);
+
+        if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+          publish();
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(Context c) throws IOException {
+        if (!output.isEmpty()) {
+          publish();
+        }
+        output = null;
+        pubsubClient.close();
+        pubsubClient = null;
+      }
+
+      private void publish() throws IOException {
+        int n = pubsubClient.publish(
+            PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+            output);
+        checkState(n == output.size());
+        output.clear();
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.delegate(Write.this);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
new file mode 100644
index 0000000..e290a6b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+/**
+ * A Pubsub client using JSON transport.
+ */
+class PubsubJsonClient extends PubsubClient {
+
+  private static class PubsubJsonClientFactory implements PubsubClientFactory {
+    private static HttpRequestInitializer chainHttpRequestInitializer(
+        Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+      if (credential == null) {
+        return httpRequestInitializer;
+      } else {
+        return new ChainingHttpRequestInitializer(
+            new HttpCredentialsAdapter(credential),
+            httpRequestInitializer);
+      }
+    }
+
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        throws IOException {
+      Pubsub pubsub = new Builder(
+          Transport.getTransport(),
+          Transport.getJsonFactory(),
+          chainHttpRequestInitializer(
+              options.getGcpCredential(),
+              // Do not log 404. It clutters the output and is possibly even required by the caller.
+              new RetryHttpRequestInitializer(ImmutableList.of(404))))
+          .setRootUrl(options.getPubsubRootUrl())
+          .setApplicationName(options.getAppName())
+          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+          .build();
+      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+    }
+
+    @Override
+    public String getKind() {
+      return "Json";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using Json transport.
+   */
+  public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Underlying JSON transport.
+   */
+  private Pubsub pubsub;
+
+  @VisibleForTesting
+  PubsubJsonClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      Pubsub pubsub) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.pubsub = pubsub;
+  }
+
+  @Override
+  public void close() {
+    // Nothing to close.
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+      Map<String, String> attributes = outgoingMessage.attributes;
+      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+        attributes = new TreeMap<>();
+      }
+      if (attributes != null) {
+        pubsubMessage.setAttributes(attributes);
+      }
+
+      if (timestampLabel != null) {
+        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        attributes.put(idLabel, outgoingMessage.recordId);
+      }
+
+      pubsubMessages.add(pubsubMessage);
+    }
+    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+    PublishResponse response = pubsub.projects()
+                                     .topics()
+                                     .publish(topic.getPath(), request)
+                                     .execute();
+    return response.getMessageIds().size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = new PullRequest()
+        .setReturnImmediately(returnImmediately)
+        .setMaxMessages(batchSize);
+    PullResponse response = pubsub.projects()
+                                  .subscriptions()
+                                  .pull(subscription.getPath(), request)
+                                  .execute();
+    if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+    for (ReceivedMessage message : response.getReceivedMessages()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.decodeData();
+
+      // Timestamp.
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable String recordId = null;
+      if (idLabel != null && attributes != null) {
+        recordId = attributes.get(idLabel);
+      }
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, recordId));
+    }
+
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+    pubsub.projects()
+          .subscriptions()
+          .acknowledge(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        new ModifyAckDeadlineRequest().setAckIds(ackIds)
+                                      .setAckDeadlineSeconds(deadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .modifyAckDeadline(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .create(topic.getPath(), new Topic())
+          .execute(); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .delete(topic.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsResponse response = pubsub.projects()
+                                        .topics()
+                                        .list(project.getPath())
+                                        .execute();
+    if (response.getTopics() == null || response.getTopics().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+    for (Topic topic : response.getTopics()) {
+      topics.add(topicPathFromPath(topic.getName()));
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = new Subscription()
+        .setTopic(topic.getPath())
+        .setAckDeadlineSeconds(ackDeadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .create(subscription.getPath(), request)
+          .execute(); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    pubsub.projects()
+          .subscriptions()
+          .delete(subscription.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException {
+    ListSubscriptionsResponse response = pubsub.projects()
+                                               .subscriptions()
+                                               .list(project.getPath())
+                                               .execute();
+    if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+    for (Subscription subscription : response.getSubscriptions()) {
+      if (subscription.getTopic().equals(topic.getPath())) {
+        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+      }
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+    return response.getAckDeadlineSeconds();
+  }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
new file mode 100644
index 0000000..c88576e
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
+ */
+class PubsubTestClient extends PubsubClient implements Serializable {
+  /**
+   * Mimic the state of the simulated Pubsub 'service'.
+   *
+   * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
+   * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+   * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+   * following data structures over all clients and must resort to a static.
+   */
+  private static class State {
+    /**
+     * True if has been primed for a test but not yet validated.
+     */
+    boolean isActive;
+
+    /**
+     * Publish mode only: Only publish calls for this topic are allowed.
+     */
+    @Nullable
+    TopicPath expectedTopic;
+
+    /**
+     * Publish mode only: Messages yet to seen in a {@link #publish} call.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+    /**
+     * Publish mode only: Messages which should throw when first sent to simulate transient publish
+     * failure.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+    /**
+     * Pull mode only: Clock from which to get current time.
+     */
+    @Nullable
+    Clock clock;
+
+    /**
+     * Pull mode only: Only pull calls for this subscription are allowed.
+     */
+    @Nullable
+    SubscriptionPath expectedSubscription;
+
+    /**
+     * Pull mode only: Timeout to simulate.
+     */
+    int ackTimeoutSec;
+
+    /**
+     * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+     */
+    @Nullable
+    List<IncomingMessage> remainingPendingIncomingMessages;
+
+    /**
+     * Pull mode only: Messages which have been returned from a {@link #pull} call and
+     * not yet ACKed by an {@link #acknowledge} call.
+     */
+    @Nullable
+    Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+    /**
+     * Pull mode only: When above messages are due to have their ACK deadlines expire.
+     */
+    @Nullable
+    Map<String, Long> ackDeadline;
+  }
+
+  private static final State STATE = new State();
+
+  /** Closing the factory will validate all expected messages were processed. */
+  public interface PubsubTestClientFactory
+          extends PubsubClientFactory, Closeable, Serializable {
+  }
+
+  /**
+   * Return a factory for testing publishers. Only one factory may be in-flight at a time.
+   * The factory must be closed when the test is complete, at which point final validation will
+   * occur.
+   */
+  static PubsubTestClientFactory createFactoryForPublish(
+      final TopicPath expectedTopic,
+      final Iterable<OutgoingMessage> expectedOutgoingMessages,
+      final Iterable<OutgoingMessage> failingOutgoingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.expectedTopic = expectedTopic;
+      STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+      STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient();
+      }
+
+      @Override
+      public String getKind() {
+        return "PublishTest";
+      }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+                     "Still waiting for %s messages to be published",
+                     STATE.remainingExpectedOutgoingMessages.size());
+          STATE.isActive = false;
+          STATE.remainingExpectedOutgoingMessages = null;
+        }
+      }
+    };
+  }
+
+  /**
+   * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
+   * The factory must be closed when the test in complete
+   */
+  public static PubsubTestClientFactory createFactoryForPull(
+      final Clock clock,
+      final SubscriptionPath expectedSubscription,
+      final int ackTimeoutSec,
+      final Iterable<IncomingMessage> expectedIncomingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.clock = clock;
+      STATE.expectedSubscription = expectedSubscription;
+      STATE.ackTimeoutSec = ackTimeoutSec;
+      STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
+      STATE.pendingAckIncomingMessages = new HashMap<>();
+      STATE.ackDeadline = new HashMap<>();
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient();
+      }
+
+      @Override
+      public String getKind() {
+        return "PullTest";
+      }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be pulled",
+                     STATE.remainingPendingIncomingMessages.size());
+          checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.pendingAckIncomingMessages.size());
+          checkState(STATE.ackDeadline.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.ackDeadline.size());
+          STATE.isActive = false;
+          STATE.remainingPendingIncomingMessages = null;
+          STATE.pendingAckIncomingMessages = null;
+          STATE.ackDeadline = null;
+        }
+      }
+    };
+  }
+
+  public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+    return new PubsubTestClientFactory() {
+      int numCalls = 0;
+
+      @Override
+      public void close() throws IOException {
+        checkState(
+            numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+      }
+
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient() {
+          @Override
+          public void createSubscription(
+              TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+              throws IOException {
+            checkState(numCalls == 0, "Expected at most one subscription to be created");
+            numCalls++;
+          }
+        };
+      }
+
+      @Override
+      public String getKind() {
+        return "CreateSubscriptionTest";
+      }
+    };
+  }
+
+  /**
+   * Return true if in pull mode.
+   */
+  private boolean inPullMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedSubscription != null;
+  }
+
+  /**
+   * Return true if in publish mode.
+   */
+  private boolean inPublishMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedTopic != null;
+  }
+
+  /**
+   * For subscription mode only:
+   * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
+   * expiring
+   * outstanding ACKs.
+   */
+  public void advance() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only advance in pull mode");
+      // Any messages who's ACKs timed out are available for re-pulling.
+      Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
+      while (deadlineItr.hasNext()) {
+        Map.Entry<String, Long> entry = deadlineItr.next();
+        if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+          STATE.remainingPendingIncomingMessages.add(
+              STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+          deadlineItr.remove();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public int publish(
+      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+    synchronized (STATE) {
+      checkState(inPublishMode(), "Can only publish in publish mode");
+      checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
+                 STATE.expectedTopic);
+      for (OutgoingMessage outgoingMessage : outgoingMessages) {
+        if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+          throw new RuntimeException("Simulating failure for " + outgoingMessage);
+        }
+        checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+                   "Unexpected outgoing message %s", outgoingMessage);
+      }
+      return outgoingMessages.size();
+    }
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+      boolean returnImmediately) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only pull in pull mode");
+      long now = STATE.clock.currentTimeMillis();
+      checkState(requestTimeMsSinceEpoch == now,
+                 "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+      checkState(returnImmediately, "Pull only supported if returning immediately");
+
+      List<IncomingMessage> incomingMessages = new ArrayList<>();
+      Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
+      while (pendItr.hasNext()) {
+        IncomingMessage incomingMessage = pendItr.next();
+        pendItr.remove();
+        IncomingMessage incomingMessageWithRequestTime =
+            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+        incomingMessages.add(incomingMessageWithRequestTime);
+        STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+                                             incomingMessageWithRequestTime);
+        STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+                              requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
+        if (incomingMessages.size() >= batchSize) {
+          break;
+        }
+      }
+      return incomingMessages;
+    }
+  }
+
+  @Override
+  public void acknowledge(
+      SubscriptionPath subscription,
+      List<String> ackIds) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only acknowledge in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        checkState(STATE.ackDeadline.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+        checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+      }
+    }
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        if (deadlineSeconds > 0) {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
+        } else {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
+          checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
+          STATE.remainingPendingIncomingMessages.add(message);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(
+      ProjectPath project, TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    synchronized (STATE) {
+      return STATE.ackTimeoutSec;
+    }
+  }
+
+  @Override
+  public boolean isEOF() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only check EOF in pull mode");
+      return STATE.remainingPendingIncomingMessages.isEmpty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
new file mode 100644
index 0000000..3ce9224
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
+
+/**
+ * A PTransform which streams messages to Pubsub.
+ * <ul>
+ * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which
+ * publishes as a side effect. (In the future we want to design and switch to a custom
+ * {@code UnboundedSink} implementation so as to gain access to system watermark and
+ * end-of-pipeline cleanup.)
+ * <li>We try to send messages in batches while also limiting send latency.
+ * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
+ * <li>Though some background threads are used by the underlying netty system all actual Pubsub
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * to execute concurrently and hide latency.
+ * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
+ * to dedup messages.
+ * </ul>
+ */
+public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
+  /**
+   * Default maximum number of messages per publish.
+   */
+  private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+
+  /**
+   * Default maximum size of a publish batch, in bytes.
+   */
+  private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+
+  /**
+   * Default longest delay between receiving a message and pushing it to Pubsub.
+   */
+  private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
+
+  /**
+   * Coder for conveying outgoing messages between internal stages.
+   */
+  private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
+    private static final NullableCoder<String> RECORD_ID_CODER =
+        NullableCoder.of(StringUtf8Coder.of());
+    private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
+            NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    @Override
+    public void encode(
+        OutgoingMessage value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+      ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
+      RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
+    }
+
+    @Override
+    public OutgoingMessage decode(
+        InputStream inStream, Context context) throws CoderException, IOException {
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
+      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
+      return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
+    }
+  }
+
+  @VisibleForTesting
+  static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
+
+  // ================================================================================
+  // RecordIdMethod
+  // ================================================================================
+
+  /**
+   * Specify how record ids are to be generated.
+   */
+  @VisibleForTesting
+  enum RecordIdMethod {
+    /** Leave null. */
+    NONE,
+    /** Generate randomly. */
+    RANDOM,
+    /** Generate deterministically. For testing only. */
+    DETERMINISTIC
+  }
+
+  // ================================================================================
+  // ShardFn
+  // ================================================================================
+
+  /**
+   * Convert elements to messages and shard them.
+   */
+  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+    private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
+    private final Coder<T> elementCoder;
+    private final int numShards;
+    private final RecordIdMethod recordIdMethod;
+    private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
+    ShardFn(Coder<T> elementCoder, int numShards,
+            SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) {
+      this.elementCoder = elementCoder;
+      this.numShards = numShards;
+      this.formatFn = formatFn;
+      this.recordIdMethod = recordIdMethod;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      elementCounter.inc();
+      byte[] elementBytes = null;
+      Map<String, String> attributes = ImmutableMap.<String, String>of();
+      if (formatFn != null) {
+        PubsubIO.PubsubMessage message = formatFn.apply(c.element());
+        elementBytes = message.getMessage();
+        attributes = message.getAttributeMap();
+      } else {
+        elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+      }
+
+      long timestampMsSinceEpoch = c.timestamp().getMillis();
+      @Nullable String recordId = null;
+      switch (recordIdMethod) {
+        case NONE:
+          break;
+        case DETERMINISTIC:
+          recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
+          break;
+        case RANDOM:
+          // Since these elements go through a GroupByKey, any  failures while sending to
+          // Pubsub will be retried without falling back and generating a new record id.
+          // Thus even though we may send the same message to Pubsub twice, it is guaranteed
+          // to have the same record id.
+          recordId = UUID.randomUUID().toString();
+          break;
+      }
+      c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
+                     new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+                             recordId)));
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("numShards", numShards));
+    }
+  }
+
+  // ================================================================================
+  // WriterFn
+  // ================================================================================
+
+  /**
+   * Publish messages to Pubsub in batches.
+   */
+  private static class WriterFn
+      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+    private final PubsubClientFactory pubsubFactory;
+    private final ValueProvider<TopicPath> topic;
+    private final String timestampLabel;
+    private final String idLabel;
+    private final int publishBatchSize;
+    private final int publishBatchBytes;
+
+    /**
+     * Client on which to talk to Pubsub. Null until created by {@link #startBundle}.
+     */
+    @Nullable
+    private transient PubsubClient pubsubClient;
+
+    private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
+    private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements");
+    private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes");
+
+    WriterFn(
+        PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
+        String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
+      this.pubsubFactory = pubsubFactory;
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.idLabel = idLabel;
+      this.publishBatchSize = publishBatchSize;
+      this.publishBatchBytes = publishBatchBytes;
+    }
+
+    /**
+     * BLOCKING
+     * Send {@code messages} as a batch to Pubsub.
+     */
+    private void publishBatch(List<OutgoingMessage> messages, int bytes)
+        throws IOException {
+      int n = pubsubClient.publish(topic.get(), messages);
+      checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
+                 messages.size(), n);
+      batchCounter.inc();
+      elementCounter.inc(messages.size());
+      byteCounter.inc(bytes);
+    }
+
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
+      pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+                                             c.getPipelineOptions().as(PubsubOptions.class));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
+      int bytes = 0;
+      for (OutgoingMessage message : c.element().getValue()) {
+        if (!pubsubMessages.isEmpty()
+            && bytes + message.elementBytes.length > publishBatchBytes) {
+          // Break large (in bytes) batches into smaller.
+          // (We've already broken by batch size using the trigger below, though that may
+          // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
+          // the hard limit from Pubsub is by bytes rather than number of messages.)
+          // BLOCKS until published.
+          publishBatch(pubsubMessages, bytes);
+          pubsubMessages.clear();
+          bytes = 0;
+        }
+        pubsubMessages.add(message);
+        bytes += message.elementBytes.length;
+      }
+      if (!pubsubMessages.isEmpty()) {
+        // BLOCKS until published.
+        publishBatch(pubsubMessages, bytes);
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      pubsubClient.close();
+      pubsubClient = null;
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+        String topicString =
+            topic == null ? null
+            : topic.isAccessible() ? topic.get().getPath()
+            : topic.toString();
+      builder.add(DisplayData.item("topic", topicString));
+      builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+    }
+  }
+
+  // ================================================================================
+  // PubsubUnboundedSink
+  // ================================================================================
+
+  /**
+   * Which factory to use for creating Pubsub transport.
+   */
+  private final PubsubClientFactory pubsubFactory;
+
+  /**
+   * Pubsub topic to publish to.
+   */
+  private final ValueProvider<TopicPath> topic;
+
+  /**
+   * Coder for elements. It is the responsibility of the underlying Pubsub transport to
+   * re-encode element bytes if necessary, eg as Base64 strings.
+   */
+  private final Coder<T> elementCoder;
+
+  /**
+   * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
+   * Pubsub message publish timestamp instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
+   * a unique id ourselves.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
+   * should be a small multiple of the number of available cores. Too smoll a number results
+   * in too much time lost to blocking Pubsub calls. To large a number results in too many
+   * single-element batches being sent to Pubsub with high per-batch overhead.
+   */
+  private final int numShards;
+
+  /**
+   * Maximum number of messages per publish.
+   */
+  private final int publishBatchSize;
+
+  /**
+   * Maximum size of a publish batch, in bytes.
+   */
+  private final int publishBatchBytes;
+
+  /**
+   * Longest delay between receiving a message and pushing it to Pubsub.
+   */
+  private final Duration maxLatency;
+
+  /**
+   * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+   * null}).
+   */
+  private final RecordIdMethod recordIdMethod;
+
+  /**
+   * In order to publish attributes, a formatting function is used to format the output into
+   * a {@link PubsubIO.PubsubMessage}.
+   */
+  private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
+  @VisibleForTesting
+  PubsubUnboundedSink(
+      PubsubClientFactory pubsubFactory,
+      ValueProvider<TopicPath> topic,
+      Coder<T> elementCoder,
+      String timestampLabel,
+      String idLabel,
+      int numShards,
+      int publishBatchSize,
+      int publishBatchBytes,
+      Duration maxLatency,
+      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
+      RecordIdMethod recordIdMethod) {
+    this.pubsubFactory = pubsubFactory;
+    this.topic = topic;
+    this.elementCoder = elementCoder;
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.numShards = numShards;
+    this.publishBatchSize = publishBatchSize;
+    this.publishBatchBytes = publishBatchBytes;
+    this.maxLatency = maxLatency;
+    this.formatFn = formatFn;
+    this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
+  }
+
+  public PubsubUnboundedSink(
+      PubsubClientFactory pubsubFactory,
+      ValueProvider<TopicPath> topic,
+      Coder<T> elementCoder,
+      String timestampLabel,
+      String idLabel,
+      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
+      int numShards) {
+    this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+         DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
+         formatFn, RecordIdMethod.RANDOM);
+  }
+
+  /**
+   * Get the topic being written to.
+   */
+  public TopicPath getTopic() {
+    return topic.get();
+  }
+
+  /**
+   * Get the {@link ValueProvider} for the topic being written to.
+   */
+  public ValueProvider<TopicPath> getTopicProvider() {
+    return topic;
+  }
+
+  /**
+   * Get the timestamp label.
+   */
+  @Nullable
+  public String getTimestampLabel() {
+    return timestampLabel;
+  }
+
+  /**
+   * Get the id label.
+   */
+  @Nullable
+  public String getIdLabel() {
+    return idLabel;
+  }
+
+  /**
+   * Get the format function used for PubSub attributes.
+   */
+  @Nullable
+  public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() {
+    return formatFn;
+  }
+
+  /**
+   * Get the Coder used to encode output elements.
+   */
+  public Coder<T> getElementCoder() {
+    return elementCoder;
+  }
+
+  @Override
+  public PDone expand(PCollection<T> input) {
+    input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
+        .triggering(
+            Repeatedly.forever(
+                AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+                    AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(maxLatency))))
+            .discardingFiredPanes())
+         .apply("PubsubUnboundedSink.Shard",
+             ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod)))
+         .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+         .apply(GroupByKey.<Integer, OutgoingMessage>create())
+         .apply("PubsubUnboundedSink.Writer",
+             ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+                 publishBatchSize, publishBatchBytes)));
+    return PDone.in(input.getPipeline());
+  }
+}