You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 15:23:06 UTC
[3/3] incubator-beam git commit: [BEAM-53] Add PubsubApiaryClient,
PubsubTestClient
[BEAM-53] Add PubsubApiaryClient, PubsubTestClient
* Move PubsubClient and friends out of sdk.io and into sdk.util.
* Add PubsubApiaryClient since gRPC has onerous boot class path
requirements which I don't wish to inflict upon other runners.
* Add PubsubTestClient in preparation for unit testing
PubsubUnbounded{Source,Sink}.
* Unit tests for all of above.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c509a85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c509a85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c509a85
Branch: refs/heads/master
Commit: 2c509a85671b425521c97894cced0b190b5ee51c
Parents: 3f0eead
Author: Mark Shields <ma...@google.com>
Authored: Tue Apr 26 18:41:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 08:22:47 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/PubsubClient.java | 323 ------------
.../apache/beam/sdk/io/PubsubGrpcClient.java | 403 ---------------
.../java/org/apache/beam/sdk/io/PubsubIO.java | 252 ++++------
.../beam/sdk/util/PubsubApiaryClient.java | 293 +++++++++++
.../org/apache/beam/sdk/util/PubsubClient.java | 489 +++++++++++++++++++
.../apache/beam/sdk/util/PubsubGrpcClient.java | 433 ++++++++++++++++
.../apache/beam/sdk/util/PubsubTestClient.java | 291 +++++++++++
.../org/apache/beam/sdk/util/Transport.java | 4 +
.../org/apache/beam/sdk/io/PubsubIOTest.java | 157 ------
.../beam/sdk/util/PubsubApiaryClientTest.java | 134 +++++
.../apache/beam/sdk/util/PubsubClientTest.java | 189 +++++++
.../beam/sdk/util/PubsubGrpcClientTest.java | 170 +++++++
.../beam/sdk/util/PubsubTestClientTest.java | 97 ++++
13 files changed, 2205 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
deleted file mode 100644
index f92b480..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * A helper interface for talking to Pubsub via an underlying transport.
- */
-public interface PubsubClient extends AutoCloseable {
- /**
- * Path representing a cloud project id.
- */
- class ProjectPath implements Serializable {
- private final String path;
-
- public ProjectPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ProjectPath that = (ProjectPath) o;
-
- return path.equals(that.path);
-
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static ProjectPath fromId(String projectId) {
- return new ProjectPath(String.format("projects/%s", projectId));
- }
- }
-
- /**
- * Path representing a Pubsub subscription.
- */
- class SubscriptionPath implements Serializable {
- private final String path;
-
- public SubscriptionPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- public String getV1Beta1Path() {
- String[] splits = path.split("/");
- checkState(splits.length == 4);
- return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SubscriptionPath that = (SubscriptionPath) o;
- return path.equals(that.path);
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static SubscriptionPath fromName(String projectId, String subscriptionName) {
- return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
- projectId, subscriptionName));
- }
- }
-
- /**
- * Path representing a Pubsub topic.
- */
- class TopicPath implements Serializable {
- private final String path;
-
- public TopicPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- public String getV1Beta1Path() {
- String[] splits = path.split("/");
- checkState(splits.length == 4);
- return String.format("/topics/%s/%s", splits[1], splits[3]);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TopicPath topicPath = (TopicPath) o;
- return path.equals(topicPath.path);
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static TopicPath fromName(String projectId, String topicName) {
- return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
- }
- }
-
- /**
- * A message to be sent to Pubsub.
- */
- class OutgoingMessage {
- /**
- * Underlying (encoded) element.
- */
- public final byte[] elementBytes;
-
- /**
- * Timestamp for element (ms since epoch).
- */
- public final long timestampMsSinceEpoch;
-
- public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
- this.elementBytes = elementBytes;
- this.timestampMsSinceEpoch = timestampMsSinceEpoch;
- }
- }
-
- /**
- * A message received from Pubsub.
- */
- class IncomingMessage {
- /**
- * Underlying (encoded) element.
- */
- public final byte[] elementBytes;
-
- /**
- * Timestamp for element (ms since epoch). Either Pubsub's processing time,
- * or the custom timestamp associated with the message.
- */
- public final long timestampMsSinceEpoch;
-
- /**
- * Timestamp (in system time) at which we requested the message (ms since epoch).
- */
- public final long requestTimeMsSinceEpoch;
-
- /**
- * Id to pass back to Pubsub to acknowledge receipt of this message.
- */
- public final String ackId;
-
- /**
- * Id to pass to the runner to distinguish this message from all others.
- */
- public final byte[] recordId;
-
- public IncomingMessage(
- byte[] elementBytes,
- long timestampMsSinceEpoch,
- long requestTimeMsSinceEpoch,
- String ackId,
- byte[] recordId) {
- this.elementBytes = elementBytes;
- this.timestampMsSinceEpoch = timestampMsSinceEpoch;
- this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
- this.ackId = ackId;
- this.recordId = recordId;
- }
- }
-
- /**
- * Gracefully close the underlying transport.
- */
- @Override
- void close();
-
-
- /**
- * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
- * published.
- *
- * @throws IOException
- */
- int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages) throws IOException;
-
- /**
- * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
- * Return the received messages, or empty collection if none were available. Does not
- * wait for messages to arrive. Returned messages will record heir request time
- * as {@code requestTimeMsSinceEpoch}.
- *
- * @throws IOException
- */
- Collection<IncomingMessage> pull(
- long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize)
- throws IOException;
-
- /**
- * Acknowldege messages from {@code subscription} with {@code ackIds}.
- *
- * @throws IOException
- */
- void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds) throws IOException;
-
- /**
- * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
- * be {@code deadlineSeconds} from now.
- *
- * @throws IOException
- */
- void modifyAckDeadline(
- SubscriptionPath subscription, Iterable<String> ackIds,
- int deadlineSeconds)
- throws IOException;
-
- /**
- * Create {@code topic}.
- *
- * @throws IOException
- */
- void createTopic(TopicPath topic) throws IOException;
-
- /*
- * Delete {@code topic}.
- *
- * @throws IOException
- */
- void deleteTopic(TopicPath topic) throws IOException;
-
- /**
- * Return a list of topics for {@code project}.
- *
- * @throws IOException
- */
- Collection<TopicPath> listTopics(ProjectPath project) throws IOException;
-
- /**
- * Create {@code subscription} to {@code topic}.
- *
- * @throws IOException
- */
- void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException;
-
- /**
- * Delete {@code subscription}.
- *
- * @throws IOException
- */
- void deleteSubscription(SubscriptionPath subscription) throws IOException;
-
- /**
- * Return a list of subscriptions for {@code topic} in {@code project}.
- *
- * @throws IOException
- */
- Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
deleted file mode 100644
index 66fb61f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import org.apache.beam.sdk.options.GcpOptions;
-
-import com.google.api.client.util.DateTime;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- */
-public class PubsubGrpcClient implements PubsubClient {
- private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
- private static final int PUBSUB_PORT = 443;
- private static final List<String> PUBSUB_SCOPES =
- Collections.singletonList("https://www.googleapis.com/auth/pubsub");
- private static final int LIST_BATCH_SIZE = 1000;
-
- /**
- * Timeout for grpc calls (in s).
- */
- private static final int TIMEOUT_S = 15;
-
- /**
- * Underlying netty channel, or {@literal null} if closed.
- */
- @Nullable
- private ManagedChannel publisherChannel;
-
- /**
- * Credentials determined from options and environment.
- */
- private final GoogleCredentials credentials;
-
- /**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
- * instead.
- */
- @Nullable
- private final String timestampLabel;
-
- /**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
- */
- @Nullable
- private final String idLabel;
-
- /**
- * Cached stubs, or null if not cached.
- */
- @Nullable
- private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
- private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
- private PubsubGrpcClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
- ManagedChannel publisherChannel, GoogleCredentials credentials) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.publisherChannel = publisherChannel;
- this.credentials = credentials;
- }
-
- /**
- * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order
- * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources
- * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use
- * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
- * message metadata.
- */
- public static PubsubGrpcClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
- GcpOptions options) throws IOException {
- ManagedChannel channel = NettyChannelBuilder
- .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
- .negotiationType(NegotiationType.TLS)
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .build();
- // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
- // various command line options. It currently only supports the older
- // com.google.api.client.auth.oauth2.Credentials.
- GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
- return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
- }
-
- /**
- * Gracefully close the underlying netty channel.
- */
- @Override
- public void close() {
- Preconditions.checkState(publisherChannel != null, "Client has already been closed");
- publisherChannel.shutdown();
- try {
- publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // Ignore.
- Thread.currentThread().interrupt();
- }
- publisherChannel = null;
- cachedPublisherStub = null;
- cachedSubscriberStub = null;
- }
-
- /**
- * Return channel with interceptor for returning credentials.
- */
- private Channel newChannel() throws IOException {
- Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
- ClientAuthInterceptor interceptor =
- new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
- return ClientInterceptors.intercept(publisherChannel, interceptor);
- }
-
- /**
- * Return a stub for making a publish request with a timeout.
- */
- private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
- if (cachedPublisherStub == null) {
- cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
- }
- return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
- }
-
- /**
- * Return a stub for making a subscribe request with a timeout.
- */
- private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
- if (cachedSubscriberStub == null) {
- cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
- }
- return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
- }
-
- @Override
- public int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages)
- throws IOException {
- PublishRequest.Builder request = PublishRequest.newBuilder()
- .setTopic(topic.getPath());
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage.Builder message =
- PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
- if (timestampLabel != null) {
- message.getMutableAttributes()
- .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
- }
-
- if (idLabel != null) {
- message.getMutableAttributes()
- .put(idLabel,
- Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
- }
-
- request.addMessages(message);
- }
-
- PublishResponse response = publisherStub().publish(request.build());
- return response.getMessageIdsCount();
- }
-
- @Override
- public Collection<IncomingMessage> pull(
- long requestTimeMsSinceEpoch,
- SubscriptionPath subscription,
- int batchSize) throws IOException {
- PullRequest request = PullRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .setReturnImmediately(true)
- .setMaxMessages(batchSize)
- .build();
- PullResponse response = subscriberStub().pull(request);
- if (response.getReceivedMessagesCount() == 0) {
- return ImmutableList.of();
- }
- List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
- for (ReceivedMessage message : response.getReceivedMessagesList()) {
- PubsubMessage pubsubMessage = message.getMessage();
- Map<String, String> attributes = pubsubMessage.getAttributes();
-
- // Payload.
- byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
- // Timestamp.
- // Start with Pubsub processing time.
- Timestamp timestampProto = pubsubMessage.getPublishTime();
- long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
- if (timestampLabel != null && attributes != null) {
- String timestampString = attributes.get(timestampLabel);
- if (timestampString != null && !timestampString.isEmpty()) {
- try {
- // Try parsing as milliseconds since epoch. Note there is no way to parse a
- // string in RFC 3339 format here.
- // Expected IllegalArgumentException if parsing fails; we use that to fall back
- // to RFC 3339.
- timestampMsSinceEpoch = Long.parseLong(timestampString);
- } catch (IllegalArgumentException e1) {
- try {
- // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
- // IllegalArgumentException if parsing fails, and the caller should handle.
- timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
- } catch (IllegalArgumentException e2) {
- // Fallback to Pubsub processing time.
- }
- }
- }
- // else: fallback to Pubsub processing time.
- }
- // else: fallback to Pubsub processing time.
-
- // Ack id.
- String ackId = message.getAckId();
- Preconditions.checkState(ackId != null && !ackId.isEmpty());
-
- // Record id, if any.
- @Nullable byte[] recordId = null;
- if (idLabel != null && attributes != null) {
- String recordIdString = attributes.get(idLabel);
- if (recordIdString != null && !recordIdString.isEmpty()) {
- recordId = recordIdString.getBytes();
- }
- }
- if (recordId == null) {
- recordId = pubsubMessage.getMessageId().getBytes();
- }
-
- incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
- requestTimeMsSinceEpoch, ackId, recordId));
- }
- return incomingMessages;
- }
-
- @Override
- public void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds)
- throws IOException {
- AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .build();
- subscriberStub().acknowledge(request); // ignore Empty result.
- }
-
- @Override
- public void modifyAckDeadline(
- SubscriptionPath subscription, Iterable<String> ackIds, int
- deadlineSeconds)
- throws IOException {
- ModifyAckDeadlineRequest request =
- ModifyAckDeadlineRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .setAckDeadlineSeconds(deadlineSeconds)
- .build();
- subscriberStub().modifyAckDeadline(request); // ignore Empty result.
- }
-
- @Override
- public void createTopic(TopicPath topic) throws IOException {
- Topic request = Topic.newBuilder()
- .setName(topic.getPath())
- .build();
- publisherStub().createTopic(request); // ignore Topic result.
- }
-
- @Override
- public void deleteTopic(TopicPath topic) throws IOException {
- DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
- .setTopic(topic.getPath())
- .build();
- publisherStub().deleteTopic(request); // ignore Empty result.
- }
-
- @Override
- public Collection<TopicPath> listTopics(ProjectPath project) throws IOException {
- ListTopicsRequest.Builder request =
- ListTopicsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListTopicsResponse response = publisherStub().listTopics(request.build());
- if (response.getTopicsCount() == 0) {
- return ImmutableList.of();
- }
- List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
- while (true) {
- for (Topic topic : response.getTopicsList()) {
- topics.add(new TopicPath(topic.getName()));
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = publisherStub().listTopics(request.build());
- }
- return topics;
- }
-
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException {
- Subscription request = Subscription.newBuilder()
- .setTopic(topic.getPath())
- .setName(subscription.getPath())
- .setAckDeadlineSeconds(ackDeadlineSeconds)
- .build();
- subscriberStub().createSubscription(request); // ignore Subscription result.
- }
-
- @Override
- public void deleteSubscription(SubscriptionPath subscription) throws IOException {
- DeleteSubscriptionRequest request =
- DeleteSubscriptionRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .build();
- subscriberStub().deleteSubscription(request); // ignore Empty result.
- }
-
- @Override
- public Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException {
- ListSubscriptionsRequest.Builder request =
- ListSubscriptionsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
- if (response.getSubscriptionsCount() == 0) {
- return ImmutableList.of();
- }
- List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
- while (true) {
- for (Subscription subscription : response.getSubscriptionsList()) {
- if (subscription.getTopic().equals(topic.getPath())) {
- subscriptions.add(new SubscriptionPath(subscription.getName()));
- }
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = subscriberStub().listSubscriptions(request.build());
- }
- return subscriptions;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 4646461..fa867c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -17,8 +17,7 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -33,25 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.PubsubApiaryClient;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
-import com.google.api.client.util.Clock;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Strings;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -61,13 +53,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import javax.annotation.Nullable;
/**
@@ -82,6 +71,9 @@ import javax.annotation.Nullable;
public class PubsubIO {
private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
+ /** Factory for creating pubsub client to manage transport. */
+ private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY;
+
/** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
@@ -143,48 +135,6 @@ public class PubsubIO {
}
/**
- * Returns the {@link Instant} that corresponds to the timestamp in the supplied
- * {@link PubsubMessage} under the specified {@code ink label}. See
- * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are
- * parsed.
- *
- * <p>The {@link Clock} parameter is used to virtualize time for testing.
- *
- * @throws IllegalArgumentException if the timestamp label is provided, but there is no
- * corresponding attribute in the message or the value provided is not a valid timestamp
- * string.
- * @see PubsubIO.Read#timestampLabel(String)
- */
- @VisibleForTesting
- protected static Instant assignMessageTimestamp(
- PubsubMessage message, @Nullable String label, Clock clock) {
- if (label == null) {
- return new Instant(clock.currentTimeMillis());
- }
-
- // Extract message attributes, defaulting to empty map if null.
- Map<String, String> attributes = firstNonNull(
- message.getAttributes(), ImmutableMap.<String, String>of());
-
- String timestampStr = attributes.get(label);
- checkArgument(timestampStr != null && !timestampStr.isEmpty(),
- "PubSub message is missing a timestamp in label: %s", label);
-
- long millisSinceEpoch;
- try {
- // Try parsing as milliseconds since epoch. Note there is no way to parse a string in
- // RFC 3339 format here.
- // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339.
- millisSinceEpoch = Long.parseLong(timestampStr);
- } catch (IllegalArgumentException e) {
- // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException
- // if parsing fails, and the caller should handle.
- millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue();
- }
- return new Instant(millisSinceEpoch);
- }
-
- /**
* Class representing a Cloud Pub/Sub Subscription.
*/
public static class PubsubSubscription implements Serializable {
@@ -679,8 +629,8 @@ public class PubsubIO {
if (boundedOutput) {
return input.getPipeline().begin()
- .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
- .apply(ParDo.of(new PubsubReader())).setCoder(coder);
+ .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+ .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder);
} else {
return PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
@@ -740,86 +690,94 @@ public class PubsubIO {
return maxReadTime;
}
- private class PubsubReader extends DoFn<Void, T> {
+ /**
+ * Default reader when Pubsub subscription has some form of upper bound.
+ * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming
+ * PubsubUnboundedSource.
+ * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
+ * service.
+ */
+ private class PubsubBoundedReader extends DoFn<Void, T> {
private static final int DEFAULT_PULL_SIZE = 100;
+ private static final int ACK_TIMEOUT_SEC = 60;
@Override
public void processElement(ProcessContext c) throws IOException {
- Pubsub pubsubClient =
- Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
- .build();
-
- String subscription;
- if (getSubscription() == null) {
- String topic = getTopic().asPath();
- String[] split = topic.split("/");
- subscription =
- "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
- + new Random().nextLong();
- Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
- try {
- pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
- } catch (Exception e) {
- throw new RuntimeException("Failed to create subscription: ", e);
+ try (PubsubClient pubsubClient =
+ FACTORY.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(PubsubOptions.class))) {
+
+ PubsubClient.SubscriptionPath subscriptionPath;
+ if (getSubscription() == null) {
+ // Create a randomized subscription derived from the topic name.
+ String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong();
+ // The subscription will be registered under this pipeline's project if we know it.
+ // Otherwise we'll fall back to the topic's project.
+ // Note that they don't need to be the same.
+ String project = c.getPipelineOptions().as(PubsubOptions.class).getProject();
+ if (Strings.isNullOrEmpty(project)) {
+ project = getTopic().project;
+ }
+ subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription);
+ TopicPath topicPath =
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+ try {
+ pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create subscription: ", e);
+ }
+ } else {
+ subscriptionPath =
+ PubsubClient.subscriptionPathFromName(getSubscription().project,
+ getSubscription().subscription);
}
- } else {
- subscription = getSubscription().asPath();
- }
- Instant endTime = (getMaxReadTime() == null)
- ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
+ Instant endTime = (getMaxReadTime() == null)
+ ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
- List<PubsubMessage> messages = new ArrayList<>();
+ List<IncomingMessage> messages = new ArrayList<>();
- Throwable finallyBlockException = null;
- try {
- while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
- && Instant.now().isBefore(endTime)) {
- PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
- if (getMaxNumRecords() > 0) {
- pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
- } else {
- pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
- }
+ Throwable finallyBlockException = null;
+ try {
+ while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
+ && Instant.now().isBefore(endTime)) {
+ int batchSize = DEFAULT_PULL_SIZE;
+ if (getMaxNumRecords() > 0) {
+ batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
+ }
- PullResponse pullResponse =
- pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
- List<String> ackIds = new ArrayList<>();
- if (pullResponse.getReceivedMessages() != null) {
- for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
- messages.add(received.getMessage());
- ackIds.add(received.getAckId());
+ List<IncomingMessage> batchMessages =
+ pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
+ false);
+ List<String> ackIds = new ArrayList<>();
+ for (IncomingMessage message : batchMessages) {
+ messages.add(message);
+ ackIds.add(message.ackId);
+ }
+ if (ackIds.size() != 0) {
+ pubsubClient.acknowledge(subscriptionPath, ackIds);
}
}
-
- if (ackIds.size() != 0) {
- AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
- pubsubClient.projects()
- .subscriptions()
- .acknowledge(subscription, ackRequest)
- .execute();
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
+ } finally {
+ if (getSubscription() == null) {
+ try {
+ pubsubClient.deleteSubscription(subscriptionPath);
+ } catch (Exception e) {
+ finallyBlockException = e;
+ }
}
}
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
- } finally {
- if (getTopic() != null) {
- try {
- pubsubClient.projects().subscriptions().delete(subscription).execute();
- } catch (IOException e) {
- finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
- LOG.error("Failed to delete subscription: ", e);
- }
+ if (finallyBlockException != null) {
+ throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
}
- }
- if (finallyBlockException != null) {
- throw new RuntimeException(finallyBlockException);
- }
- for (PubsubMessage message : messages) {
- c.outputWithTimestamp(
- CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
- assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM));
+ for (IncomingMessage message : messages) {
+ c.outputWithTimestamp(
+ CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes),
+ new Instant(message.timestampMsSinceEpoch));
+ }
}
}
}
@@ -1026,31 +984,28 @@ public class PubsubIO {
return coder;
}
+ /**
+ * Writer to Pubsub which batches messages.
+ * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
+ * service.
+ */
private class PubsubWriter extends DoFn<T, Void> {
private static final int MAX_PUBLISH_BATCH_SIZE = 100;
- private transient List<PubsubMessage> output;
- private transient Pubsub pubsubClient;
+ private transient List<OutgoingMessage> output;
+ private transient PubsubClient pubsubClient;
@Override
- public void startBundle(Context c) {
+ public void startBundle(Context c) throws IOException {
this.output = new ArrayList<>();
- this.pubsubClient =
- Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
- .build();
+ this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(PubsubOptions.class));
}
@Override
public void processElement(ProcessContext c) throws IOException {
- PubsubMessage message =
- new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element()));
- if (getTimestampLabel() != null) {
- Map<String, String> attributes = message.getAttributes();
- if (attributes == null) {
- attributes = new HashMap<>();
- message.setAttributes(attributes);
- }
- attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis()));
- }
+ OutgoingMessage message =
+ new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
+ c.timestamp().getMillis());
output.add(message);
if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
@@ -1063,13 +1018,16 @@ public class PubsubIO {
if (!output.isEmpty()) {
publish();
}
+ output = null;
+ pubsubClient.close();
+ pubsubClient = null;
}
private void publish() throws IOException {
- PublishRequest publishRequest = new PublishRequest().setMessages(output);
- pubsubClient.projects().topics()
- .publish(getTopic().asPath(), publishRequest)
- .execute();
+ int n = pubsubClient.publish(
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+ output);
+ checkState(n == output.size());
output.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
new file mode 100644
index 0000000..f0a9096
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A Pubsub client using Apiary.
+ */
+public class PubsubApiaryClient extends PubsubClient {
+
+ public static final PubsubClientFactory FACTORY = new PubsubClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ Pubsub pubsub = new Builder(
+ Transport.getTransport(),
+ Transport.getJsonFactory(),
+ new ChainingHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.getPubsubRootUrl())
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+ .build();
+ return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
+ }
+ };
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Underlying Apiary client.
+ */
+ private Pubsub pubsub;
+
+ @VisibleForTesting
+ PubsubApiaryClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ Pubsub pubsub) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.pubsub = pubsub;
+ }
+
+ @Override
+ public void close() {
+ // Nothing to close.
+ }
+
+ @Override
+ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+ Map<String, String> attributes = pubsubMessage.getAttributes();
+ if ((timestampLabel != null || idLabel != null) && attributes == null) {
+ attributes = new TreeMap<>();
+ pubsubMessage.setAttributes(attributes);
+ }
+
+ if (timestampLabel != null) {
+ attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null) {
+ attributes.put(idLabel,
+ Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ }
+
+ pubsubMessages.add(pubsubMessage);
+ }
+ PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+ PublishResponse response = pubsub.projects()
+ .topics()
+ .publish(topic.getPath(), request)
+ .execute();
+ return response.getMessageIds().size();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately) throws IOException {
+ PullRequest request = new PullRequest()
+ .setReturnImmediately(returnImmediately)
+ .setMaxMessages(batchSize);
+ PullResponse response = pubsub.projects()
+ .subscriptions()
+ .pull(subscription.getPath(), request)
+ .execute();
+ if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+ for (ReceivedMessage message : response.getReceivedMessages()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.decodeData();
+
+ // Timestamp.
+ long timestampMsSinceEpoch =
+ extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+ // Ack id.
+ String ackId = message.getAckId();
+ checkState(!Strings.isNullOrEmpty(ackId));
+
+ // Record id, if any.
+ @Nullable byte[] recordId = null;
+ if (idLabel != null && attributes != null) {
+ String recordIdString = attributes.get(idLabel);
+ if (!Strings.isNullOrEmpty(recordIdString)) {
+ recordId = recordIdString.getBytes();
+ }
+ }
+ if (recordId == null) {
+ recordId = pubsubMessage.getMessageId().getBytes();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+ AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+ pubsub.projects()
+ .subscriptions()
+ .acknowledge(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ new ModifyAckDeadlineRequest().setAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .modifyAckDeadline(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .create(topic.getPath(), new Topic())
+ .execute(); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .delete(topic.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsResponse response = pubsub.projects()
+ .topics()
+ .list(project.getPath())
+ .execute();
+ if (response.getTopics() == null || response.getTopics().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+ for (Topic topic : response.getTopics()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = new Subscription()
+ .setTopic(topic.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .create(subscription.getPath(), request)
+ .execute(); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ pubsub.projects()
+ .subscriptions()
+ .delete(subscription.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsResponse response = pubsub.projects()
+ .subscriptions()
+ .list(project.getPath())
+ .execute();
+ if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+ for (Subscription subscription : response.getSubscriptions()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+ return response.getAckDeadlineSeconds();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
new file mode 100644
index 0000000..a44329d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+public abstract class PubsubClient implements Closeable {
+ /**
+ * Factory for creating clients.
+ */
+ public interface PubsubClientFactory extends Serializable {
+ /**
+ * Construct a new Pubsub client. It should be closed via {@link #close} in order
+ * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+ * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+ * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+ * timestamps/ids within message metadata.
+ */
+ PubsubClient newClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ PubsubOptions options) throws IOException;
+ }
+
+ /**
+ * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+ * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+ * if timestamp cannot be recognized.
+ */
+ @Nullable
+ private static Long asMsSinceEpoch(@Nullable String timestamp) {
+ if (Strings.isNullOrEmpty(timestamp)) {
+ return null;
+ }
+ try {
+ // Try parsing as milliseconds since epoch. Note there is no way to parse a
+ // string in RFC 3339 format here.
+ // Expected IllegalArgumentException if parsing fails; we use that to fall back
+ // to RFC 3339.
+ return Long.parseLong(timestamp);
+ } catch (IllegalArgumentException e1) {
+ // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+ // IllegalArgumentException if parsing fails, and the caller should handle.
+ return DateTime.parseRfc3339(timestamp).getValue();
+ }
+ }
+
+ /**
+ * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+ * attributes} and {@code pubsubTimestamp}.
+ * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+ * that label, and the value of that label will be taken as the timestamp.
+ * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+ * pubsubTimestamp}. Throw {@link IllegalArgumentException} if the timestamp cannot be
+ * recognized as a ms-since-unix-epoch or RFC3339 time.
+ *
+ * @throws IllegalArgumentException
+ */
+ protected static long extractTimestamp(
+ @Nullable String timestampLabel,
+ @Nullable String pubsubTimestamp,
+ @Nullable Map<String, String> attributes) {
+ Long timestampMsSinceEpoch;
+ if (Strings.isNullOrEmpty(timestampLabel)) {
+ timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret PubSub publish timestamp: %s",
+ pubsubTimestamp);
+ } else {
+ String value = attributes == null ? null : attributes.get(timestampLabel);
+ checkArgument(value != null,
+ "PubSub message is missing a value for timestamp label %s",
+ timestampLabel);
+ timestampMsSinceEpoch = asMsSinceEpoch(value);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret value of label %s as timestamp: %s",
+ timestampLabel, value);
+ }
+ return timestampMsSinceEpoch;
+ }
+
+ /**
+ * Path representing a cloud project id.
+ */
+ public static class ProjectPath implements Serializable {
+ private final String path;
+
+ ProjectPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProjectPath that = (ProjectPath) o;
+
+ return path.equals(that.path);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+ public static ProjectPath projectPathFromPath(String path) {
+ return new ProjectPath(path);
+ }
+
+ public static ProjectPath projectPathFromId(String projectId) {
+ return new ProjectPath(String.format("projects/%s", projectId));
+ }
+
+ /**
+ * Path representing a Pubsub subscription.
+ */
+ public static class SubscriptionPath implements Serializable {
+ private final String path;
+
+ SubscriptionPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed subscription path %s", path);
+ return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubscriptionPath that = (SubscriptionPath) o;
+ return path.equals(that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+ public static SubscriptionPath subscriptionPathFromPath(String path) {
+ return new SubscriptionPath(path);
+ }
+
+ public static SubscriptionPath subscriptionPathFromName(
+ String projectId, String subscriptionName) {
+ return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+ projectId, subscriptionName));
+ }
+
+ /**
+ * Path representing a Pubsub topic.
+ */
+ public static class TopicPath implements Serializable {
+ private final String path;
+
+ TopicPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return String.format("/topics/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicPath topicPath = (TopicPath) o;
+ return path.equals(topicPath.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+ public static TopicPath topicPathFromPath(String path) {
+ return new TopicPath(path);
+ }
+
+ public static TopicPath topicPathFromName(String projectId, String topicName) {
+ return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+ }
+
+ /**
+ * A message to be sent to Pubsub.
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ public static class OutgoingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch).
+ */
+ public final long timestampMsSinceEpoch;
+
+ public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OutgoingMessage that = (OutgoingMessage) o;
+
+ if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
+ return false;
+ }
+ return Arrays.equals(elementBytes, that.elementBytes);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch);
+ }
+ }
+
+ /**
+ * A message received from Pubsub.
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ public static class IncomingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+ * or the custom timestamp associated with the message.
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * Timestamp (in system time) at which we requested the message (ms since epoch).
+ */
+ public final long requestTimeMsSinceEpoch;
+
+ /**
+ * Id to pass back to Pubsub to acknowledge receipt of this message.
+ */
+ public final String ackId;
+
+ /**
+ * Id to pass to the runner to distinguish this message from all others.
+ */
+ public final byte[] recordId;
+
+ public IncomingMessage(
+ byte[] elementBytes,
+ long timestampMsSinceEpoch,
+ long requestTimeMsSinceEpoch,
+ String ackId,
+ byte[] recordId) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+
+ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+ return new IncomingMessage(elementBytes, timestampMsSinceEpoch, requestTimeMsSinceEpoch,
+ ackId, recordId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncomingMessage that = (IncomingMessage) o;
+
+ if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
+ return false;
+ }
+ if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) {
+ return false;
+ }
+ if (!Arrays.equals(elementBytes, that.elementBytes)) {
+ return false;
+ }
+ if (!ackId.equals(that.ackId)) {
+ return false;
+ }
+ return Arrays.equals(recordId, that.recordId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId, Arrays.hashCode(recordId));
+ }
+ }
+
+ /**
+ * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+ * published.
+ *
+ * @throws IOException
+ */
+ public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException;
+
+ /**
+ * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+ * Return the received messages, or empty collection if none were available. Does not
+ * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+ * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+ *
+ * @throws IOException
+ */
+ public abstract List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately)
+ throws IOException;
+
+ /**
+ * Acknowldege messages from {@code subscription} with {@code ackIds}.
+ *
+ * @throws IOException
+ */
+ public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+ throws IOException;
+
+ /**
+ * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+ * be {@code deadlineSeconds} from now.
+ *
+ * @throws IOException
+ */
+ public abstract void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds,
+ int deadlineSeconds) throws IOException;
+
+ /**
+ * Create {@code topic}.
+ *
+ * @throws IOException
+ */
+ public abstract void createTopic(TopicPath topic) throws IOException;
+
+ /*
+ * Delete {@code topic}.
+ *
+ * @throws IOException
+ */
+ public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+ /**
+ * Return a list of topics for {@code project}.
+ *
+ * @throws IOException
+ */
+ public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+ /**
+ * Create {@code subscription} to {@code topic}.
+ *
+ * @throws IOException
+ */
+ public abstract void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+ /**
+ * Delete {@code subscription}.
+ *
+ * @throws IOException
+ */
+ public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return a list of subscriptions for {@code topic} in {@code project}.
+ *
+ * @throws IOException
+ */
+ public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException;
+
+ /**
+ * Return the ack deadline, in seconds, for {@code subscription}.
+ *
+ * @throws IOException
+ */
+ public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
new file mode 100644
index 0000000..b3c1b8f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for talking to Pubsub via grpc.
+ */
+public class PubsubGrpcClient extends PubsubClient {
+ private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+ private static final int PUBSUB_PORT = 443;
+ private static final List<String> PUBSUB_SCOPES =
+ Collections.singletonList("https://www.googleapis.com/auth/pubsub");
+ private static final int LIST_BATCH_SIZE = 1000;
+
+ private static final int DEFAULT_TIMEOUT_S = 15;
+
+ public static final PubsubClientFactory FACTORY =
+ new PubsubClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ ManagedChannel channel = NettyChannelBuilder
+ .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+ // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+ // various command line options. It currently only supports the older
+ // com.google.api.client.auth.oauth2.Credentials.
+ GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+ return new PubsubGrpcClient(timestampLabel,
+ idLabel,
+ DEFAULT_TIMEOUT_S,
+ channel,
+ credentials,
+ null /* publisher stub */,
+ null /* subscriber stub */);
+ }
+ };
+
+ /**
+ * Timeout for grpc calls (in s).
+ */
+ private final int timeoutSec;
+
+ /**
+ * Underlying netty channel, or {@literal null} if closed.
+ */
+ @Nullable
+ private ManagedChannel publisherChannel;
+
+ /**
+ * Credentials determined from options and environment.
+ */
+ private final GoogleCredentials credentials;
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+
+ /**
+ * Cached stubs, or null if not cached.
+ */
+ @Nullable
+ private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+ private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+ @VisibleForTesting
+ PubsubGrpcClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ int timeoutSec,
+ ManagedChannel publisherChannel,
+ GoogleCredentials credentials,
+ PublisherGrpc.PublisherBlockingStub cachedPublisherStub,
+ SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.timeoutSec = timeoutSec;
+ this.publisherChannel = publisherChannel;
+ this.credentials = credentials;
+ this.cachedPublisherStub = cachedPublisherStub;
+ this.cachedSubscriberStub = cachedSubscriberStub;
+ }
+
+ /**
+ * Gracefully close the underlying netty channel.
+ */
+ @Override
+ public void close() {
+ if (publisherChannel == null) {
+ // Already closed.
+ return;
+ }
+ // Can gc the underlying stubs.
+ cachedPublisherStub = null;
+ cachedSubscriberStub = null;
+ // Mark the client as having been closed before going further
+ // in case we have an exception from the channel.
+ ManagedChannel publisherChannel = this.publisherChannel;
+ this.publisherChannel = null;
+ // Gracefully shutdown the channel.
+ publisherChannel.shutdown();
+ if (timeoutSec > 0) {
+ try {
+ publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Return channel with interceptor for returning credentials.
+ */
+ private Channel newChannel() throws IOException {
+ checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+ ClientAuthInterceptor interceptor =
+ new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+ return ClientInterceptors.intercept(publisherChannel, interceptor);
+ }
+
+ /**
+ * Return a stub for making a publish request with a timeout.
+ */
+ private PublisherBlockingStub publisherStub() throws IOException {
+ if (cachedPublisherStub == null) {
+ cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+ }
+ if (timeoutSec > 0) {
+ return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+ } else {
+ return cachedPublisherStub;
+ }
+ }
+
+ /**
+ * Return a stub for making a subscribe request with a timeout.
+ */
+ private SubscriberBlockingStub subscriberStub() throws IOException {
+ if (cachedSubscriberStub == null) {
+ cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+ }
+ if (timeoutSec > 0) {
+ return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+ } else {
+ return cachedSubscriberStub;
+ }
+ }
+
+ @Override
+ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ PublishRequest.Builder request = PublishRequest.newBuilder()
+ .setTopic(topic.getPath());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage.Builder message =
+ PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+ if (timestampLabel != null) {
+ message.getMutableAttributes()
+ .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null) {
+ message.getMutableAttributes()
+ .put(idLabel,
+ Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ }
+
+ request.addMessages(message);
+ }
+
+ PublishResponse response = publisherStub().publish(request.build());
+ return response.getMessageIdsCount();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately) throws IOException {
+ PullRequest request = PullRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .setReturnImmediately(returnImmediately)
+ .setMaxMessages(batchSize)
+ .build();
+ PullResponse response = subscriberStub().pull(request);
+ if (response.getReceivedMessagesCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+ for (ReceivedMessage message : response.getReceivedMessagesList()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+ // Timestamp.
+ String pubsubTimestampString = null;
+ Timestamp timestampProto = pubsubMessage.getPublishTime();
+ if (timestampProto != null) {
+ pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
+ + timestampProto.getNanos() / 1000L);
+ }
+ long timestampMsSinceEpoch =
+ extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+
+ // Ack id.
+ String ackId = message.getAckId();
+ checkState(!Strings.isNullOrEmpty(ackId));
+
+ // Record id, if any.
+ @Nullable byte[] recordId = null;
+ if (idLabel != null && attributes != null) {
+ String recordIdString = attributes.get(idLabel);
+ if (recordIdString != null && !recordIdString.isEmpty()) {
+ recordId = recordIdString.getBytes();
+ }
+ }
+ if (recordId == null) {
+ recordId = pubsubMessage.getMessageId().getBytes();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+ throws IOException {
+ AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .build();
+ subscriberStub().acknowledge(request); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ ModifyAckDeadlineRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds)
+ .build();
+ subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ Topic request = Topic.newBuilder()
+ .setName(topic.getPath())
+ .build();
+ publisherStub().createTopic(request); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
+ .setTopic(topic.getPath())
+ .build();
+ publisherStub().deleteTopic(request); // ignore Empty result.
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsRequest.Builder request =
+ ListTopicsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListTopicsResponse response = publisherStub().listTopics(request.build());
+ if (response.getTopicsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
+ while (true) {
+ for (Topic topic : response.getTopicsList()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = publisherStub().listTopics(request.build());
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = Subscription.newBuilder()
+ .setTopic(topic.getPath())
+ .setName(subscription.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds)
+ .build();
+ subscriberStub().createSubscription(request); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ DeleteSubscriptionRequest request =
+ DeleteSubscriptionRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .build();
+ subscriberStub().deleteSubscription(request); // ignore Empty result.
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsRequest.Builder request =
+ ListSubscriptionsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+ if (response.getSubscriptionsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+ while (true) {
+ for (Subscription subscription : response.getSubscriptionsList()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = subscriberStub().listSubscriptions(request.build());
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ GetSubscriptionRequest request =
+ GetSubscriptionRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .build();
+ Subscription response = subscriberStub().getSubscription(request);
+ return response.getAckDeadlineSeconds();
+ }
+}