You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:01 UTC
[30/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
new file mode 100644
index 0000000..8fc1c19
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
+ * and consume unbounded {@link PCollection PCollections}.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Beam pipeline. Please refer to the documentation of corresponding
+ * {@link PipelineRunner PipelineRunners} for more details.
+ */
+public class PubsubIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
+
+ /** Factory for creating pubsub client to manage transport. */
+ private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
+
+ /**
+ * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
+ * IDs must start with a letter and may not end with a dash.
+ * This regex isn't exact - this allows for patterns that would be rejected by
+ * the service, but this is sufficient for basic parsing of table references.
+ */
+ private static final Pattern PROJECT_ID_REGEXP =
+ Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
+
+ private static final Pattern SUBSCRIPTION_REGEXP =
+ Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
+
+ private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
+
+ private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
+ Pattern.compile("/subscriptions/([^/]+)/(.+)");
+
+ private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
+
+ private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
+
+ private static final int PUBSUB_NAME_MIN_LENGTH = 3;
+ private static final int PUBSUB_NAME_MAX_LENGTH = 255;
+
+ private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
+ private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
+ private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
+
+ private static void validateProjectName(String project) {
+ Matcher match = PROJECT_ID_REGEXP.matcher(project);
+ if (!match.matches()) {
+ throw new IllegalArgumentException(
+ "Illegal project name specified in Pubsub subscription: " + project);
+ }
+ }
+
+ private static void validatePubsubName(String name) {
+ if (name.length() < PUBSUB_NAME_MIN_LENGTH) {
+ throw new IllegalArgumentException(
+ "Pubsub object name is shorter than 3 characters: " + name);
+ }
+ if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
+ throw new IllegalArgumentException(
+ "Pubsub object name is longer than 255 characters: " + name);
+ }
+
+ if (name.startsWith("goog")) {
+ throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
+ }
+
+ Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
+ if (!match.matches()) {
+ throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
+ + " Please see Javadoc for naming rules.");
+ }
+ }
+
+ /**
+ * Populate common {@link DisplayData} between Pubsub source and sink.
+ */
+ private static void populateCommonDisplayData(DisplayData.Builder builder,
+ String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
+ builder
+ .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
+ .withLabel("Timestamp Label Attribute"))
+ .addIfNotNull(DisplayData.item("idLabel", idLabel)
+ .withLabel("ID Label Attribute"));
+
+ if (topic != null) {
+ String topicString = topic.isAccessible() ? topic.get().asPath()
+ : topic.toString();
+ builder.add(DisplayData.item("topic", topicString)
+ .withLabel("Pubsub Topic"));
+ }
+ }
+
+ /**
+ * Class representing a Pub/Sub message. Each message contains a single message payload and
+ * a map of attached attributes.
+ */
+ public static class PubsubMessage {
+
+ private byte[] message;
+ private Map<String, String> attributes;
+
+ public PubsubMessage(byte[] message, Map<String, String> attributes) {
+ this.message = message;
+ this.attributes = attributes;
+ }
+
+ /**
+ * Returns the main PubSub message.
+ */
+ public byte[] getMessage() {
+ return message;
+ }
+
+ /**
+ * Returns the given attribute value. If not such attribute exists, returns null.
+ */
+ @Nullable
+ public String getAttribute(String attribute) {
+ checkNotNull(attribute, "attribute");
+ return attributes.get(attribute);
+ }
+
+ /**
+ * Returns the full map of attributes. This is an unmodifiable map.
+ */
+ public Map<String, String> getAttributeMap() {
+ return attributes;
+ }
+ }
+
+ /**
+ * Class representing a Cloud Pub/Sub Subscription.
+ */
+ public static class PubsubSubscription implements Serializable {
+
+ private enum Type {NORMAL, FAKE}
+
+ private final Type type;
+ private final String project;
+ private final String subscription;
+
+ private PubsubSubscription(Type type, String project, String subscription) {
+ this.type = type;
+ this.project = project;
+ this.subscription = subscription;
+ }
+
+ /**
+ * Creates a class representing a Pub/Sub subscription from the specified subscription path.
+ *
+ * <p>Cloud Pub/Sub subscription names should be of the form
+ * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
+ * of the project the subscription belongs to. The {@code <subscription>} component must comply
+ * with the following requirements:
+ *
+ * <ul>
+ * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
+ * ('.').</li>
+ * <li>Must be between 3 and 255 characters.</li>
+ * <li>Must begin with a letter.</li>
+ * <li>Must end with a letter or a number.</li>
+ * <li>Cannot begin with {@code 'goog'} prefix.</li>
+ * </ul>
+ */
+ public static PubsubSubscription fromPath(String path) {
+ if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
+ || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
+ return new PubsubSubscription(Type.FAKE, "", path);
+ }
+
+ String projectName, subscriptionName;
+
+ Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
+ if (v1beta1Match.matches()) {
+ LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
+ + "projects/<project_id>/subscriptions/<subscription_name>");
+ projectName = v1beta1Match.group(1);
+ subscriptionName = v1beta1Match.group(2);
+ } else {
+ Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
+ if (!match.matches()) {
+ throw new IllegalArgumentException("Pubsub subscription is not in "
+ + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
+ }
+ projectName = match.group(1);
+ subscriptionName = match.group(2);
+ }
+
+ validateProjectName(projectName);
+ validatePubsubName(subscriptionName);
+ return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
+ }
+
+ /**
+ * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+ * v1beta1 API.
+ *
+ * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
+ */
+ @Deprecated
+ public String asV1Beta1Path() {
+ if (type == Type.NORMAL) {
+ return "/subscriptions/" + project + "/" + subscription;
+ } else {
+ return subscription;
+ }
+ }
+
+ /**
+ * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+ * v1beta2 API.
+ *
+ * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
+ */
+ @Deprecated
+ public String asV1Beta2Path() {
+ if (type == Type.NORMAL) {
+ return "projects/" + project + "/subscriptions/" + subscription;
+ } else {
+ return subscription;
+ }
+ }
+
+ /**
+ * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
+ * API.
+ */
+ public String asPath() {
+ if (type == Type.NORMAL) {
+ return "projects/" + project + "/subscriptions/" + subscription;
+ } else {
+ return subscription;
+ }
+ }
+ }
+
+ /**
+ * Used to build a {@link ValueProvider} for {@link PubsubSubscription}.
+ */
+ private static class SubscriptionTranslator
+ implements SerializableFunction<String, PubsubSubscription> {
+
+ @Override
+ public PubsubSubscription apply(String from) {
+ return PubsubSubscription.fromPath(from);
+ }
+ }
+
+ /**
+ * Used to build a {@link ValueProvider} for {@link SubscriptionPath}.
+ */
+ private static class SubscriptionPathTranslator
+ implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
+
+ @Override
+ public SubscriptionPath apply(PubsubSubscription from) {
+ return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
+ }
+ }
+
+ /**
+ * Used to build a {@link ValueProvider} for {@link PubsubTopic}.
+ */
+ private static class TopicTranslator
+ implements SerializableFunction<String, PubsubTopic> {
+
+ @Override
+ public PubsubTopic apply(String from) {
+ return PubsubTopic.fromPath(from);
+ }
+ }
+
+ /**
+ * Used to build a {@link ValueProvider} for {@link TopicPath}.
+ */
+ private static class TopicPathTranslator
+ implements SerializableFunction<PubsubTopic, TopicPath> {
+
+ @Override
+ public TopicPath apply(PubsubTopic from) {
+ return PubsubClient.topicPathFromName(from.project, from.topic);
+ }
+ }
+
+ /**
+ * Used to build a {@link ValueProvider} for {@link ProjectPath}.
+ */
+ private static class ProjectPathTranslator
+ implements SerializableFunction<PubsubTopic, ProjectPath> {
+
+ @Override
+ public ProjectPath apply(PubsubTopic from) {
+ return PubsubClient.projectPathFromId(from.project);
+ }
+ }
+
+ /**
+ * Class representing a Cloud Pub/Sub Topic.
+ */
+ public static class PubsubTopic implements Serializable {
+
+ private enum Type {NORMAL, FAKE}
+
+ private final Type type;
+ private final String project;
+ private final String topic;
+
+ private PubsubTopic(Type type, String project, String topic) {
+ this.type = type;
+ this.project = project;
+ this.topic = topic;
+ }
+
+ /**
+ * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
+ *
+ * <p>Cloud Pub/Sub topic names should be of the form
+ * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
+ * the publishing project. The {@code <topic>} component must comply with
+ * the following requirements:
+ *
+ * <ul>
+ * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
+ * ('.').</li>
+ * <li>Must be between 3 and 255 characters.</li>
+ * <li>Must begin with a letter.</li>
+ * <li>Must end with a letter or a number.</li>
+ * <li>Cannot begin with 'goog' prefix.</li>
+ * </ul>
+ */
+ public static PubsubTopic fromPath(String path) {
+ if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
+ return new PubsubTopic(Type.FAKE, "", path);
+ }
+
+ String projectName, topicName;
+
+ Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
+ if (v1beta1Match.matches()) {
+ LOG.warn("Saw topic in v1beta1 format. Topics should be in the format "
+ + "projects/<project_id>/topics/<topic_name>");
+ projectName = v1beta1Match.group(1);
+ topicName = v1beta1Match.group(2);
+ } else {
+ Matcher match = TOPIC_REGEXP.matcher(path);
+ if (!match.matches()) {
+ throw new IllegalArgumentException(
+ "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
+ }
+ projectName = match.group(1);
+ topicName = match.group(2);
+ }
+
+ validateProjectName(projectName);
+ validatePubsubName(topicName);
+ return new PubsubTopic(Type.NORMAL, projectName, topicName);
+ }
+
+ /**
+ * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+ * v1beta1 API.
+ *
+ * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
+ */
+ @Deprecated
+ public String asV1Beta1Path() {
+ if (type == Type.NORMAL) {
+ return "/topics/" + project + "/" + topic;
+ } else {
+ return topic;
+ }
+ }
+
+ /**
+ * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+ * v1beta2 API.
+ *
+ * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
+ */
+ @Deprecated
+ public String asV1Beta2Path() {
+ if (type == Type.NORMAL) {
+ return "projects/" + project + "/topics/" + topic;
+ } else {
+ return topic;
+ }
+ }
+
+ /**
+ * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
+ * API.
+ */
+ public String asPath() {
+ if (type == Type.NORMAL) {
+ return "projects/" + project + "/topics/" + topic;
+ } else {
+ return topic;
+ }
+ }
+ }
+
+ public static <T> Read<T> read() {
+ return new Read<>();
+ }
+
+ public static <T> Write<T> write() {
+ return new Write<>();
+ }
+
+ /**
+ * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
+ * returns a {@link PCollection} of {@link String Strings} containing the items from
+ * the stream.
+ */
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+ /** The Cloud Pub/Sub topic to read from. */
+ @Nullable
+ private final ValueProvider<PubsubTopic> topic;
+
+ /** The Cloud Pub/Sub subscription to read from. */
+ @Nullable
+ private final ValueProvider<PubsubSubscription> subscription;
+
+ /** The name of the message attribute to read timestamps from. */
+ @Nullable
+ private final String timestampLabel;
+
+ /** The name of the message attribute to read unique message IDs from. */
+ @Nullable
+ private final String idLabel;
+
+ /** The coder used to decode each record. */
+ @Nullable
+ private final Coder<T> coder;
+
+ /** User function for parsing PubsubMessage object. */
+ SimpleFunction<PubsubMessage, T> parseFn;
+
+ private Read() {
+ this(null, null, null, null, null, null, null);
+ }
+
+ private Read(String name, ValueProvider<PubsubSubscription> subscription,
+ ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
+ String idLabel,
+ SimpleFunction<PubsubMessage, T> parseFn) {
+ super(name);
+ this.subscription = subscription;
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.coder = coder;
+ this.idLabel = idLabel;
+ this.parseFn = parseFn;
+ }
+
+ /**
+ * Returns a transform that's like this one but reading from the
+ * given subscription.
+ *
+ * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
+ * of the {@code subscription} string.
+ *
+ * <p>Multiple readers reading from the same subscription will each receive
+ * some arbitrary portion of the data. Most likely, separate readers should
+ * use their own subscriptions.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read<T> subscription(String subscription) {
+ return subscription(StaticValueProvider.of(subscription));
+ }
+
+ /**
+ * Like {@code subscription()} but with a {@link ValueProvider}.
+ */
+ public Read<T> subscription(ValueProvider<String> subscription) {
+ if (subscription.isAccessible()) {
+ // Validate.
+ PubsubSubscription.fromPath(subscription.get());
+ }
+ return new Read<>(
+ name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+ null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+ }
+
+ /**
+ * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+ * with {@link #subscription(String)}.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+ * of the {@code topic} string.
+ *
+ * <p>The Beam runner will start reading data published on this topic from the time the
+ * pipeline is started. Any data published on the topic before the pipeline is started will
+ * not be read by the runner.
+ */
+ public Read<T> topic(String topic) {
+ return topic(StaticValueProvider.of(topic));
+ }
+
+ /**
+ * Like {@code topic()} but with a {@link ValueProvider}.
+ */
+ public Read<T> topic(ValueProvider<String> topic) {
+ if (topic.isAccessible()) {
+ // Validate.
+ PubsubTopic.fromPath(topic.get());
+ }
+ return new Read<>(name, null /* reset subscription to null */,
+ NestedValueProvider.of(topic, new TopicTranslator()),
+ timestampLabel, coder, idLabel, parseFn);
+ }
+
+ /**
+ * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+ * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+ * parameter specifies the name of the attribute that contains the timestamp.
+ *
+ * <p>The timestamp value is expected to be represented in the attribute as either:
+ *
+ * <ul>
+ * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+ * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+ * value for this attribute.
+ * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+ * sub-second component of the timestamp is optional, and digits beyond the first three
+ * (i.e., time units smaller than milliseconds) will be ignored.
+ * </ul>
+ *
+ * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+ * the first time it sees each record. All windowing will be done relative to these
+ * timestamps.
+ *
+ * <p>By default, windows are emitted based on an estimate of when this source is likely
+ * done producing data for a given timestamp (referred to as the Watermark; see
+ * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+ * specified with the windowing strategy – by default it will be output immediately.
+ *
+ * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+ * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+ *
+ * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
+ */
+ public Read<T> timestampLabel(String timestampLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ parseFn);
+ }
+
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+ * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+ * parameter specifies the attribute name. The value of the attribute can be any string
+ * that uniquely identifies this record.
+ *
+ * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+ * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+ * be delivered, and deduplication of the stream will be strictly best effort.
+ */
+ public Read<T> idLabel(String idLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ parseFn);
+ }
+
+ /**
+ * Returns a transform that's like this one but that uses the given
+ * {@link Coder} to decode each record into a value of type {@code T}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read<T> withCoder(Coder<T> coder) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ parseFn);
+ }
+
+ /**
+ * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+ * The user must supply a parsing function to transform the PubsubMessage into an output type.
+ * A Coder for the output type T must be registered or set on the output via
+ * {@link PCollection#setCoder(Coder)}.
+ */
+ public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+ return new Read<T>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ parseFn);
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (topic == null && subscription == null) {
+ throw new IllegalStateException("Need to set either the topic or the subscription for "
+ + "a PubsubIO.Read transform");
+ }
+ if (topic != null && subscription != null) {
+ throw new IllegalStateException("Can't set both the topic and the subscription for "
+ + "a PubsubIO.Read transform");
+ }
+ if (coder == null) {
+ throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+ + "the withCoder method.");
+ }
+
+ @Nullable ValueProvider<ProjectPath> projectPath =
+ topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+ @Nullable ValueProvider<TopicPath> topicPath =
+ topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+ @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+ subscription == null
+ ? null
+ : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
+ PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
+ FACTORY, projectPath, topicPath, subscriptionPath,
+ coder, timestampLabel, idLabel, parseFn);
+ return input.getPipeline().apply(source);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+
+ if (subscription != null) {
+ String subscriptionString = subscription.isAccessible()
+ ? subscription.get().asPath() : subscription.toString();
+ builder.add(DisplayData.item("subscription", subscriptionString)
+ .withLabel("Pubsub Subscription"));
+ }
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
+
+ /**
+ * Get the topic being read from.
+ */
+ @Nullable
+ public PubsubTopic getTopic() {
+ return topic == null ? null : topic.get();
+ }
+
+ /**
+ * Get the {@link ValueProvider} for the topic being read from.
+ */
+ public ValueProvider<PubsubTopic> getTopicProvider() {
+ return topic;
+ }
+
+ /**
+ * Get the subscription being read from.
+ */
+ @Nullable
+ public PubsubSubscription getSubscription() {
+ return subscription == null ? null : subscription.get();
+ }
+
+ /**
+ * Get the {@link ValueProvider} for the subscription being read from.
+ */
+ public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
+ return subscription;
+ }
+
+ /**
+ * Get the timestamp label.
+ */
+ @Nullable
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ /**
+ * Get the id label.
+ */
+ @Nullable
+ public String getIdLabel() {
+ return idLabel;
+ }
+
+
+ /**
+ * Get the {@link Coder} used for the transform's output.
+ */
+ @Nullable
+ public Coder<T> getCoder() {
+ return coder;
+ }
+
+ /**
+ * Get the parse function used for PubSub attributes.
+ */
+ @Nullable
+ public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+ return parseFn;
+ }
+
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Disallow construction of utility class. */
+ private PubsubIO() {}
+
+
+ /**
+ * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
+ * to a Cloud Pub/Sub stream.
+ */
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
+ /** The Cloud Pub/Sub topic to publish to. */
+ @Nullable
+ private final ValueProvider<PubsubTopic> topic;
+ /** The name of the message attribute to publish message timestamps in. */
+ @Nullable
+ private final String timestampLabel;
+ /** The name of the message attribute to publish unique message IDs in. */
+ @Nullable
+ private final String idLabel;
+ /** The input type Coder. */
+ private final Coder<T> coder;
+ /** The format function for input PubsubMessage objects. */
+ SimpleFunction<T, PubsubMessage> formatFn;
+
+ private Write() {
+ this(null, null, null, null, null, null);
+ }
+
+ private Write(
+ String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
+ String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
+ super(name);
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.coder = coder;
+ this.formatFn = formatFn;
+ }
+
+ /**
+ * Creates a transform that publishes to the specified topic.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
+ * {@code topic} string.
+ */
+ public Write<T> topic(String topic) {
+ return topic(StaticValueProvider.of(topic));
+ }
+
+ /**
+ * Like {@code topic()} but with a {@link ValueProvider}.
+ */
+ public Write<T> topic(ValueProvider<String> topic) {
+ return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
+ timestampLabel, idLabel, coder, formatFn);
+ }
+
+ /**
+ * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
+ * messages in an attribute with the specified name. The value of the attribute will be a number
+ * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+ * time classes, {@link Instant#Instant(long)} can be used to parse this value.
+ *
+ * <p>If the output from this sink is being read by another Beam pipeline, then
+ * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
+ * these timestamps from the appropriate attribute.
+ */
+ public Write<T> timestampLabel(String timestampLabel) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ }
+
+ /**
+ * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
+ * published messages in an attribute with the specified name. The value of the attribute is an
+ * opaque string.
+ *
+ * <p>If the the output from this sink is being read by another Beam pipeline, then
+ * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
+ * these unique identifiers from the appropriate attribute.
+ */
+ public Write<T> idLabel(String idLabel) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ }
+
+ /**
+ * Returns a new transform that's like this one
+ * but that uses the given {@link Coder} to encode each of
+ * the elements of the input {@link PCollection} into an
+ * output record.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write<T> withCoder(Coder<T> coder) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ }
+
+ /**
+ * Used to write a PubSub message together with PubSub attributes. The user-supplied format
+ * function translates the input type T to a PubsubMessage object, which is used by the sink
+ * to separately set the PubSub message's payload and attributes.
+ */
+ public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+ return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ if (topic == null) {
+ throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
+ }
+ switch (input.isBounded()) {
+ case BOUNDED:
+ input.apply(ParDo.of(new PubsubBoundedWriter()));
+ return PDone.in(input.getPipeline());
+ case UNBOUNDED:
+ return input.apply(new PubsubUnboundedSink<T>(
+ FACTORY,
+ NestedValueProvider.of(topic, new TopicPathTranslator()),
+ coder,
+ timestampLabel,
+ idLabel,
+ formatFn,
+ 100 /* numShards */));
+ }
+ throw new RuntimeException(); // cases are exhaustive.
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ }
+
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+
+ /**
+ * Returns the PubSub topic being written to.
+ */
+ @Nullable
+ public PubsubTopic getTopic() {
+ return (topic == null) ? null : topic.get();
+ }
+
+ /**
+ * Returns the {@link ValueProvider} for the topic being written to.
+ */
+ @Nullable
+ public ValueProvider<PubsubTopic> getTopicProvider() {
+ return topic;
+ }
+
+ /**
+ * Returns the timestamp label.
+ */
+ @Nullable
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ /**
+ * Returns the id label.
+ */
+ @Nullable
+ public String getIdLabel() {
+ return idLabel;
+ }
+
+ /**
+ * Returns the output coder.
+ */
+ @Nullable
+ public Coder<T> getCoder() {
+ return coder;
+ }
+
+ /**
+ * Returns the formatting function used if publishing attributes.
+ */
+ @Nullable
+ public SimpleFunction<T, PubsubMessage> getFormatFn() {
+ return formatFn;
+ }
+
+ /**
+ * Writer to Pubsub which batches messages from bounded collections.
+ *
+ * <p>Public so can be suppressed by runners.
+ */
+ public class PubsubBoundedWriter extends DoFn<T, Void> {
+
+ private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+ private transient List<OutgoingMessage> output;
+ private transient PubsubClient pubsubClient;
+
+ @StartBundle
+ public void startBundle(Context c) throws IOException {
+ this.output = new ArrayList<>();
+ // NOTE: idLabel is ignored.
+ this.pubsubClient =
+ FACTORY.newClient(timestampLabel, null,
+ c.getPipelineOptions().as(PubsubOptions.class));
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ byte[] payload = null;
+ Map<String, String> attributes = null;
+ if (formatFn != null) {
+ PubsubMessage message = formatFn.apply(c.element());
+ payload = message.getMessage();
+ attributes = message.getAttributeMap();
+ } else {
+ payload = CoderUtils.encodeToByteArray(getCoder(), c.element());
+ }
+ // NOTE: The record id is always null.
+ OutgoingMessage message =
+ new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null);
+ output.add(message);
+
+ if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+ publish();
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws IOException {
+ if (!output.isEmpty()) {
+ publish();
+ }
+ output = null;
+ pubsubClient.close();
+ pubsubClient = null;
+ }
+
+ private void publish() throws IOException {
+ int n = pubsubClient.publish(
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+ output);
+ checkState(n == output.size());
+ output.clear();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.delegate(Write.this);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
new file mode 100644
index 0000000..e290a6b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+/**
+ * A Pubsub client using JSON transport.
+ */
+class PubsubJsonClient extends PubsubClient {
+
+ private static class PubsubJsonClientFactory implements PubsubClientFactory {
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return httpRequestInitializer;
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ Pubsub pubsub = new Builder(
+ Transport.getTransport(),
+ Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.getPubsubRootUrl())
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+ .build();
+ return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+ }
+
+ @Override
+ public String getKind() {
+ return "Json";
+ }
+ }
+
+ /**
+ * Factory for creating Pubsub clients using Json transport.
+ */
+ public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Underlying JSON transport.
+ */
+ private Pubsub pubsub;
+
+ @VisibleForTesting
+ PubsubJsonClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ Pubsub pubsub) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.pubsub = pubsub;
+ }
+
+ @Override
+ public void close() {
+ // Nothing to close.
+ }
+
+ @Override
+ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+ Map<String, String> attributes = outgoingMessage.attributes;
+ if ((timestampLabel != null || idLabel != null) && attributes == null) {
+ attributes = new TreeMap<>();
+ }
+ if (attributes != null) {
+ pubsubMessage.setAttributes(attributes);
+ }
+
+ if (timestampLabel != null) {
+ attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idLabel, outgoingMessage.recordId);
+ }
+
+ pubsubMessages.add(pubsubMessage);
+ }
+ PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+ PublishResponse response = pubsub.projects()
+ .topics()
+ .publish(topic.getPath(), request)
+ .execute();
+ return response.getMessageIds().size();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately) throws IOException {
+ PullRequest request = new PullRequest()
+ .setReturnImmediately(returnImmediately)
+ .setMaxMessages(batchSize);
+ PullResponse response = pubsub.projects()
+ .subscriptions()
+ .pull(subscription.getPath(), request)
+ .execute();
+ if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+ for (ReceivedMessage message : response.getReceivedMessages()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.decodeData();
+
+ // Timestamp.
+ long timestampMsSinceEpoch =
+ extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+ // Ack id.
+ String ackId = message.getAckId();
+ checkState(!Strings.isNullOrEmpty(ackId));
+
+ // Record id, if any.
+ @Nullable String recordId = null;
+ if (idLabel != null && attributes != null) {
+ recordId = attributes.get(idLabel);
+ }
+ if (Strings.isNullOrEmpty(recordId)) {
+ // Fall back to the Pubsub provided message id.
+ recordId = pubsubMessage.getMessageId();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+ AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+ pubsub.projects()
+ .subscriptions()
+ .acknowledge(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ new ModifyAckDeadlineRequest().setAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .modifyAckDeadline(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .create(topic.getPath(), new Topic())
+ .execute(); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .delete(topic.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsResponse response = pubsub.projects()
+ .topics()
+ .list(project.getPath())
+ .execute();
+ if (response.getTopics() == null || response.getTopics().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+ for (Topic topic : response.getTopics()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = new Subscription()
+ .setTopic(topic.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .create(subscription.getPath(), request)
+ .execute(); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ pubsub.projects()
+ .subscriptions()
+ .delete(subscription.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsResponse response = pubsub.projects()
+ .subscriptions()
+ .list(project.getPath())
+ .execute();
+ if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+ for (Subscription subscription : response.getSubscriptions()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+ return response.getAckDeadlineSeconds();
+ }
+
+ @Override
+ public boolean isEOF() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
new file mode 100644
index 0000000..c88576e
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
+ */
+class PubsubTestClient extends PubsubClient implements Serializable {
+ /**
+ * Mimic the state of the simulated Pubsub 'service'.
+ *
+ * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
+ * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+ * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+ * following data structures over all clients and must resort to a static.
+ */
+ private static class State {
+ /**
+ * True if has been primed for a test but not yet validated.
+ */
+ boolean isActive;
+
+ /**
+ * Publish mode only: Only publish calls for this topic are allowed.
+ */
+ @Nullable
+ TopicPath expectedTopic;
+
+ /**
+ * Publish mode only: Messages yet to seen in a {@link #publish} call.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+ /**
+ * Publish mode only: Messages which should throw when first sent to simulate transient publish
+ * failure.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+ /**
+ * Pull mode only: Clock from which to get current time.
+ */
+ @Nullable
+ Clock clock;
+
+ /**
+ * Pull mode only: Only pull calls for this subscription are allowed.
+ */
+ @Nullable
+ SubscriptionPath expectedSubscription;
+
+ /**
+ * Pull mode only: Timeout to simulate.
+ */
+ int ackTimeoutSec;
+
+ /**
+ * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+ */
+ @Nullable
+ List<IncomingMessage> remainingPendingIncomingMessages;
+
+ /**
+ * Pull mode only: Messages which have been returned from a {@link #pull} call and
+ * not yet ACKed by an {@link #acknowledge} call.
+ */
+ @Nullable
+ Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+ /**
+ * Pull mode only: When above messages are due to have their ACK deadlines expire.
+ */
+ @Nullable
+ Map<String, Long> ackDeadline;
+ }
+
+ private static final State STATE = new State();
+
+ /** Closing the factory will validate all expected messages were processed. */
+ public interface PubsubTestClientFactory
+ extends PubsubClientFactory, Closeable, Serializable {
+ }
+
+ /**
+ * Return a factory for testing publishers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test is complete, at which point final validation will
+ * occur.
+ */
+ static PubsubTestClientFactory createFactoryForPublish(
+ final TopicPath expectedTopic,
+ final Iterable<OutgoingMessage> expectedOutgoingMessages,
+ final Iterable<OutgoingMessage> failingOutgoingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.expectedTopic = expectedTopic;
+ STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+ STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient();
+ }
+
+ @Override
+ public String getKind() {
+ return "PublishTest";
+ }
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+ "Still waiting for %s messages to be published",
+ STATE.remainingExpectedOutgoingMessages.size());
+ STATE.isActive = false;
+ STATE.remainingExpectedOutgoingMessages = null;
+ }
+ }
+ };
+ }
+
+ /**
+ * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test in complete
+ */
+ public static PubsubTestClientFactory createFactoryForPull(
+ final Clock clock,
+ final SubscriptionPath expectedSubscription,
+ final int ackTimeoutSec,
+ final Iterable<IncomingMessage> expectedIncomingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.clock = clock;
+ STATE.expectedSubscription = expectedSubscription;
+ STATE.ackTimeoutSec = ackTimeoutSec;
+ STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
+ STATE.pendingAckIncomingMessages = new HashMap<>();
+ STATE.ackDeadline = new HashMap<>();
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient();
+ }
+
+ @Override
+ public String getKind() {
+ return "PullTest";
+ }
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be pulled",
+ STATE.remainingPendingIncomingMessages.size());
+ checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.pendingAckIncomingMessages.size());
+ checkState(STATE.ackDeadline.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.ackDeadline.size());
+ STATE.isActive = false;
+ STATE.remainingPendingIncomingMessages = null;
+ STATE.pendingAckIncomingMessages = null;
+ STATE.ackDeadline = null;
+ }
+ }
+ };
+ }
+
+ public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+ return new PubsubTestClientFactory() {
+ int numCalls = 0;
+
+ @Override
+ public void close() throws IOException {
+ checkState(
+ numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+ }
+
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient() {
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+ throws IOException {
+ checkState(numCalls == 0, "Expected at most one subscription to be created");
+ numCalls++;
+ }
+ };
+ }
+
+ @Override
+ public String getKind() {
+ return "CreateSubscriptionTest";
+ }
+ };
+ }
+
+ /**
+ * Return true if in pull mode.
+ */
+ private boolean inPullMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedSubscription != null;
+ }
+
+ /**
+ * Return true if in publish mode.
+ */
+ private boolean inPublishMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedTopic != null;
+ }
+
+ /**
+ * For subscription mode only:
+ * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
+ * expiring
+ * outstanding ACKs.
+ */
+ public void advance() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only advance in pull mode");
+ // Any messages who's ACKs timed out are available for re-pulling.
+ Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
+ while (deadlineItr.hasNext()) {
+ Map.Entry<String, Long> entry = deadlineItr.next();
+ if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+ STATE.remainingPendingIncomingMessages.add(
+ STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+ deadlineItr.remove();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int publish(
+ TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+ synchronized (STATE) {
+ checkState(inPublishMode(), "Can only publish in publish mode");
+ checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
+ STATE.expectedTopic);
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+ throw new RuntimeException("Simulating failure for " + outgoingMessage);
+ }
+ checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+ "Unexpected outgoing message %s", outgoingMessage);
+ }
+ return outgoingMessages.size();
+ }
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+ boolean returnImmediately) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only pull in pull mode");
+ long now = STATE.clock.currentTimeMillis();
+ checkState(requestTimeMsSinceEpoch == now,
+ "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+ checkState(returnImmediately, "Pull only supported if returning immediately");
+
+ List<IncomingMessage> incomingMessages = new ArrayList<>();
+ Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
+ while (pendItr.hasNext()) {
+ IncomingMessage incomingMessage = pendItr.next();
+ pendItr.remove();
+ IncomingMessage incomingMessageWithRequestTime =
+ incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+ incomingMessages.add(incomingMessageWithRequestTime);
+ STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+ incomingMessageWithRequestTime);
+ STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+ requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
+ if (incomingMessages.size() >= batchSize) {
+ break;
+ }
+ }
+ return incomingMessages;
+ }
+ }
+
+ @Override
+ public void acknowledge(
+ SubscriptionPath subscription,
+ List<String> ackIds) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only acknowledge in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ }
+ }
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ if (deadlineSeconds > 0) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
+ } else {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
+ checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.remainingPendingIncomingMessages.add(message);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(
+ ProjectPath project, TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ synchronized (STATE) {
+ return STATE.ackTimeoutSec;
+ }
+ }
+
+ @Override
+ public boolean isEOF() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only check EOF in pull mode");
+ return STATE.remainingPendingIncomingMessages.isEmpty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
new file mode 100644
index 0000000..3ce9224
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
+
+/**
+ * A PTransform which streams messages to Pubsub.
+ * <ul>
+ * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which
+ * publishes as a side effect. (In the future we want to design and switch to a custom
+ * {@code UnboundedSink} implementation so as to gain access to system watermark and
+ * end-of-pipeline cleanup.)
+ * <li>We try to send messages in batches while also limiting send latency.
+ * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
+ * <li>Though some background threads are used by the underlying netty system all actual Pubsub
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * to execute concurrently and hide latency.
+ * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
+ * to dedup messages.
+ * </ul>
+ */
+public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
+ /**
+ * Default maximum number of messages per publish.
+ */
+ private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+
+ /**
+ * Default maximum size of a publish batch, in bytes.
+ */
+ private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+
+ /**
+ * Default longest delay between receiving a message and pushing it to Pubsub.
+ */
+ private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
+
+ /**
+ * Coder for conveying outgoing messages between internal stages.
+ */
+ private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
+ private static final NullableCoder<String> RECORD_ID_CODER =
+ NullableCoder.of(StringUtf8Coder.of());
+ private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
+ NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+ @Override
+ public void encode(
+ OutgoingMessage value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+ ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
+ RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
+ }
+
+ @Override
+ public OutgoingMessage decode(
+ InputStream inStream, Context context) throws CoderException, IOException {
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+ Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
+ long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
+ @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
+ return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
+ }
+ }
+
+ @VisibleForTesting
+ static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
+
+ // ================================================================================
+ // RecordIdMethod
+ // ================================================================================
+
+ /**
+ * Specify how record ids are to be generated.
+ */
+ @VisibleForTesting
+ enum RecordIdMethod {
+ /** Leave null. */
+ NONE,
+ /** Generate randomly. */
+ RANDOM,
+ /** Generate deterministically. For testing only. */
+ DETERMINISTIC
+ }
+
+ // ================================================================================
+ // ShardFn
+ // ================================================================================
+
+ /**
+ * Convert elements to messages and shard them.
+ */
+ private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+ private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
+ private final Coder<T> elementCoder;
+ private final int numShards;
+ private final RecordIdMethod recordIdMethod;
+ private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
+ ShardFn(Coder<T> elementCoder, int numShards,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) {
+ this.elementCoder = elementCoder;
+ this.numShards = numShards;
+ this.formatFn = formatFn;
+ this.recordIdMethod = recordIdMethod;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ elementCounter.inc();
+ byte[] elementBytes = null;
+ Map<String, String> attributes = ImmutableMap.<String, String>of();
+ if (formatFn != null) {
+ PubsubIO.PubsubMessage message = formatFn.apply(c.element());
+ elementBytes = message.getMessage();
+ attributes = message.getAttributeMap();
+ } else {
+ elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+ }
+
+ long timestampMsSinceEpoch = c.timestamp().getMillis();
+ @Nullable String recordId = null;
+ switch (recordIdMethod) {
+ case NONE:
+ break;
+ case DETERMINISTIC:
+ recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
+ break;
+ case RANDOM:
+ // Since these elements go through a GroupByKey, any failures while sending to
+ // Pubsub will be retried without falling back and generating a new record id.
+ // Thus even though we may send the same message to Pubsub twice, it is guaranteed
+ // to have the same record id.
+ recordId = UUID.randomUUID().toString();
+ break;
+ }
+ c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
+ new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ recordId)));
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("numShards", numShards));
+ }
+ }
+
+ // ================================================================================
+ // WriterFn
+ // ================================================================================
+
+ /**
+ * Publish messages to Pubsub in batches.
+ */
+ private static class WriterFn
+ extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+ private final PubsubClientFactory pubsubFactory;
+ private final ValueProvider<TopicPath> topic;
+ private final String timestampLabel;
+ private final String idLabel;
+ private final int publishBatchSize;
+ private final int publishBatchBytes;
+
+ /**
+ * Client on which to talk to Pubsub. Null until created by {@link #startBundle}.
+ */
+ @Nullable
+ private transient PubsubClient pubsubClient;
+
+ private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
+ private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements");
+ private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes");
+
+ WriterFn(
+ PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
+ String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
+ this.pubsubFactory = pubsubFactory;
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.publishBatchSize = publishBatchSize;
+ this.publishBatchBytes = publishBatchBytes;
+ }
+
+ /**
+ * BLOCKING
+ * Send {@code messages} as a batch to Pubsub.
+ */
+ private void publishBatch(List<OutgoingMessage> messages, int bytes)
+ throws IOException {
+ int n = pubsubClient.publish(topic.get(), messages);
+ checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
+ messages.size(), n);
+ batchCounter.inc();
+ elementCounter.inc(messages.size());
+ byteCounter.inc(bytes);
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
+ pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(PubsubOptions.class));
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
+ int bytes = 0;
+ for (OutgoingMessage message : c.element().getValue()) {
+ if (!pubsubMessages.isEmpty()
+ && bytes + message.elementBytes.length > publishBatchBytes) {
+ // Break large (in bytes) batches into smaller.
+ // (We've already broken by batch size using the trigger below, though that may
+ // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
+ // the hard limit from Pubsub is by bytes rather than number of messages.)
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ pubsubMessages.clear();
+ bytes = 0;
+ }
+ pubsubMessages.add(message);
+ bytes += message.elementBytes.length;
+ }
+ if (!pubsubMessages.isEmpty()) {
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ pubsubClient.close();
+ pubsubClient = null;
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ String topicString =
+ topic == null ? null
+ : topic.isAccessible() ? topic.get().getPath()
+ : topic.toString();
+ builder.add(DisplayData.item("topic", topicString));
+ builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+ builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+ builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ }
+ }
+
+ // ================================================================================
+ // PubsubUnboundedSink
+ // ================================================================================
+
+ /**
+ * Which factory to use for creating Pubsub transport.
+ */
+ private final PubsubClientFactory pubsubFactory;
+
+ /**
+ * Pubsub topic to publish to.
+ */
+ private final ValueProvider<TopicPath> topic;
+
+ /**
+ * Coder for elements. It is the responsibility of the underlying Pubsub transport to
+ * re-encode element bytes if necessary, eg as Base64 strings.
+ */
+ private final Coder<T> elementCoder;
+
+ /**
+ * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
+ * Pubsub message publish timestamp instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
+ * a unique id ourselves.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
+ * should be a small multiple of the number of available cores. Too smoll a number results
+ * in too much time lost to blocking Pubsub calls. To large a number results in too many
+ * single-element batches being sent to Pubsub with high per-batch overhead.
+ */
+ private final int numShards;
+
+ /**
+ * Maximum number of messages per publish.
+ */
+ private final int publishBatchSize;
+
+ /**
+ * Maximum size of a publish batch, in bytes.
+ */
+ private final int publishBatchBytes;
+
+ /**
+ * Longest delay between receiving a message and pushing it to Pubsub.
+ */
+ private final Duration maxLatency;
+
+ /**
+ * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+ * null}).
+ */
+ private final RecordIdMethod recordIdMethod;
+
+ /**
+ * In order to publish attributes, a formatting function is used to format the output into
+ * a {@link PubsubIO.PubsubMessage}.
+ */
+ private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
+ @VisibleForTesting
+ PubsubUnboundedSink(
+ PubsubClientFactory pubsubFactory,
+ ValueProvider<TopicPath> topic,
+ Coder<T> elementCoder,
+ String timestampLabel,
+ String idLabel,
+ int numShards,
+ int publishBatchSize,
+ int publishBatchBytes,
+ Duration maxLatency,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
+ RecordIdMethod recordIdMethod) {
+ this.pubsubFactory = pubsubFactory;
+ this.topic = topic;
+ this.elementCoder = elementCoder;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.numShards = numShards;
+ this.publishBatchSize = publishBatchSize;
+ this.publishBatchBytes = publishBatchBytes;
+ this.maxLatency = maxLatency;
+ this.formatFn = formatFn;
+ this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
+ }
+
+ public PubsubUnboundedSink(
+ PubsubClientFactory pubsubFactory,
+ ValueProvider<TopicPath> topic,
+ Coder<T> elementCoder,
+ String timestampLabel,
+ String idLabel,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
+ int numShards) {
+ this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+ DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
+ formatFn, RecordIdMethod.RANDOM);
+ }
+
+ /**
+ * Get the topic being written to.
+ */
+ public TopicPath getTopic() {
+ return topic.get();
+ }
+
+ /**
+ * Get the {@link ValueProvider} for the topic being written to.
+ */
+ public ValueProvider<TopicPath> getTopicProvider() {
+ return topic;
+ }
+
+ /**
+ * Get the timestamp label.
+ */
+ @Nullable
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ /**
+ * Get the id label.
+ */
+ @Nullable
+ public String getIdLabel() {
+ return idLabel;
+ }
+
+ /**
+ * Get the format function used for PubSub attributes.
+ */
+ @Nullable
+ public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() {
+ return formatFn;
+ }
+
+ /**
+ * Get the Coder used to encode output elements.
+ */
+ public Coder<T> getElementCoder() {
+ return elementCoder;
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(maxLatency))))
+ .discardingFiredPanes())
+ .apply("PubsubUnboundedSink.Shard",
+ ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod)))
+ .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+ .apply(GroupByKey.<Integer, OutgoingMessage>create())
+ .apply("PubsubUnboundedSink.Writer",
+ ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+ publishBatchSize, publishBatchBytes)));
+ return PDone.in(input.getPipeline());
+ }
+}