You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/12 20:19:54 UTC
[1/2] incubator-beam git commit: Google Cloud Pubsub client with gRPC
implementation
Repository: incubator-beam
Updated Branches:
refs/heads/master 9f41ddbb3 -> 46c82acf5
Google Cloud Pubsub client with gRPC implementation
This Pubsub client is specialized to Beam's use cases, such as
a distinguished attribute label that contains event timestamps.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06d47099
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06d47099
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06d47099
Branch: refs/heads/master
Commit: 06d470998f01d366364a1d98dfbb73deecdbf797
Parents: d2b659b
Author: Mark Shields <ma...@google.com>
Authored: Thu Apr 7 11:06:51 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 12 11:03:22 2016 -0700
----------------------------------------------------------------------
pom.xml | 1 +
sdks/java/core/pom.xml | 34 ++
.../cloud/dataflow/sdk/io/PubsubClient.java | 322 +++++++++++++++
.../cloud/dataflow/sdk/io/PubsubGrpcClient.java | 401 +++++++++++++++++++
4 files changed, 758 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06d47099/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 591b67a..243cff3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
<bigtable.version>0.2.3</bigtable.version>
+ <pubsubgrpc.version>0.0.2</pubsubgrpc.version>
<clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06d47099/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6595a21..9762136 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -390,6 +390,40 @@
</dependency>
<dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-oauth2-http</artifactId>
+ <version>0.3.1</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>4.1.0.Beta8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.api.grpc</groupId>
+ <artifactId>grpc-pubsub-v1</artifactId>
+ <version>${pubsubgrpc.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-protos</artifactId>
<version>${bigtable.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06d47099/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
new file mode 100644
index 0000000..e5b8a39
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.api.client.repackaged.com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A helper interface for talking to Pubsub via an underlying transport.
+ */
+public interface PubsubClient extends AutoCloseable {
+ /**
+ * Path representing a cloud project id.
+ */
+ class ProjectPath implements Serializable {
+ private final String path;
+
+ public ProjectPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProjectPath that = (ProjectPath) o;
+
+ return path.equals(that.path);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ public static ProjectPath fromId(String projectId) {
+ return new ProjectPath(String.format("projects/%s", projectId));
+ }
+ }
+
+ /**
+ * Path representing a Pubsub subscription.
+ */
+ class SubscriptionPath implements Serializable {
+ private final String path;
+
+ public SubscriptionPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ Preconditions.checkState(splits.length == 4);
+ return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubscriptionPath that = (SubscriptionPath) o;
+ return path.equals(that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ public static SubscriptionPath fromName(String projectId, String subscriptionName) {
+ return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+ projectId, subscriptionName));
+ }
+ }
+
+ /**
+ * Path representing a Pubsub topic.
+ */
+ class TopicPath implements Serializable {
+ private final String path;
+
+ public TopicPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ Preconditions.checkState(splits.length == 4);
+ return String.format("/topics/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicPath topicPath = (TopicPath) o;
+ return path.equals(topicPath.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ public static TopicPath fromName(String projectId, String topicName) {
+ return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+ }
+ }
+
+ /**
+ * A message to be sent to Pubsub.
+ */
+ class OutgoingMessage {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch).
+ */
+ public final long timestampMsSinceEpoch;
+
+ public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ }
+ }
+
+ /**
+ * A message received from Pubsub.
+ */
+ class IncomingMessage {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+ * or the custom timestamp associated with the message.
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * Timestamp (in system time) at which we requested the message (ms since epoch).
+ */
+ public final long requestTimeMsSinceEpoch;
+
+ /**
+ * Id to pass back to Pubsub to acknowledge receipt of this message.
+ */
+ public final String ackId;
+
+ /**
+ * Id to pass to the runner to distinguish this message from all others.
+ */
+ public final byte[] recordId;
+
+ public IncomingMessage(
+ byte[] elementBytes,
+ long timestampMsSinceEpoch,
+ long requestTimeMsSinceEpoch,
+ String ackId,
+ byte[] recordId) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+ }
+
+ /**
+ * Gracefully close the underlying transport.
+ */
+ @Override
+ void close();
+
+
+ /**
+ * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+ * published.
+ *
+ * @throws IOException
+ */
+ int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages) throws IOException;
+
+ /**
+ * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+ * Return the received messages, or empty collection if none were available. Does not
+ * wait for messages to arrive. Returned messages will record heir request time
+ * as {@code requestTimeMsSinceEpoch}.
+ *
+ * @throws IOException
+ */
+ Collection<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize)
+ throws IOException;
+
+ /**
+ * Acknowldege messages from {@code subscription} with {@code ackIds}.
+ *
+ * @throws IOException
+ */
+ void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds) throws IOException;
+
+ /**
+ * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+ * be {@code deadlineSeconds} from now.
+ *
+ * @throws IOException
+ */
+ void modifyAckDeadline(
+ SubscriptionPath subscription, Iterable<String> ackIds,
+ int deadlineSeconds)
+ throws IOException;
+
+ /**
+ * Create {@code topic}.
+ *
+ * @throws IOException
+ */
+ void createTopic(TopicPath topic) throws IOException;
+
+ /*
+ * Delete {@code topic}.
+ *
+ * @throws IOException
+ */
+ void deleteTopic(TopicPath topic) throws IOException;
+
+ /**
+ * Return a list of topics for {@code project}.
+ *
+ * @throws IOException
+ */
+ Collection<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+ /**
+ * Create {@code subscription} to {@code topic}.
+ *
+ * @throws IOException
+ */
+ void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException;
+
+ /**
+ * Delete {@code subscription}.
+ *
+ * @throws IOException
+ */
+ void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return a list of subscriptions for {@code topic} in {@code project}.
+ *
+ * @throws IOException
+ */
+ Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06d47099/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
new file mode 100644
index 0000000..6e34705
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.api.client.util.DateTime;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for talking to Pubsub via grpc.
+ */
+public class PubsubGrpcClient implements PubsubClient {
+ private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+ private static final int PUBSUB_PORT = 443;
+ private static final List<String> PUBSUB_SCOPES =
+ Collections.singletonList("https://www.googleapis.com/auth/pubsub");
+ private static final int LIST_BATCH_SIZE = 1000;
+
+ /**
+ * Timeout for grpc calls (in s).
+ */
+ private static final int TIMEOUT_S = 15;
+
+ /**
+ * Underlying netty channel, or {@literal null} if closed.
+ */
+ @Nullable
+ private ManagedChannel publisherChannel;
+
+ /**
+ * Credentials determined from options and environment.
+ */
+ private final GoogleCredentials credentials;
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Cached stubs, or null if not cached.
+ */
+ @Nullable
+ private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+ private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+ private PubsubGrpcClient(
+ @Nullable String timestampLabel, @Nullable String idLabel,
+ ManagedChannel publisherChannel, GoogleCredentials credentials) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.publisherChannel = publisherChannel;
+ this.credentials = credentials;
+ }
+
+ /**
+ * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order
+ * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources
+ * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use
+ * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
+ * message metadata.
+ */
+ public static PubsubGrpcClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel,
+ GcpOptions options) throws IOException {
+ ManagedChannel channel = NettyChannelBuilder
+ .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+ // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+ // various command line options. It currently only supports the older
+ // com.google.api.client.auth.oauth2.Credentials.
+ GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+ return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
+ }
+
+ /**
+ * Gracefully close the underlying netty channel.
+ */
+ @Override
+ public void close() {
+ Preconditions.checkState(publisherChannel != null, "Client has already been closed");
+ publisherChannel.shutdown();
+ try {
+ publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore.
+ Thread.currentThread().interrupt();
+ }
+ publisherChannel = null;
+ cachedPublisherStub = null;
+ cachedSubscriberStub = null;
+ }
+
+ /**
+ * Return channel with interceptor for returning credentials.
+ */
+ private Channel newChannel() throws IOException {
+ Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+ ClientAuthInterceptor interceptor =
+ new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+ return ClientInterceptors.intercept(publisherChannel, interceptor);
+ }
+
+ /**
+ * Return a stub for making a publish request with a timeout.
+ */
+ private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
+ if (cachedPublisherStub == null) {
+ cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+ }
+ return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Return a stub for making a subscribe request with a timeout.
+ */
+ private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
+ if (cachedSubscriberStub == null) {
+ cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+ }
+ return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ PublishRequest.Builder request = PublishRequest.newBuilder()
+ .setTopic(topic.getPath());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage.Builder message =
+ PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+ if (timestampLabel != null) {
+ message.getMutableAttributes()
+ .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null) {
+ message.getMutableAttributes()
+ .put(idLabel,
+ Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ }
+
+ request.addMessages(message);
+ }
+
+ PublishResponse response = publisherStub().publish(request.build());
+ return response.getMessageIdsCount();
+ }
+
+ @Override
+ public Collection<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize) throws IOException {
+ PullRequest request = PullRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .setReturnImmediately(true)
+ .setMaxMessages(batchSize)
+ .build();
+ PullResponse response = subscriberStub().pull(request);
+ if (response.getReceivedMessagesCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+ for (ReceivedMessage message : response.getReceivedMessagesList()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+ // Timestamp.
+ // Start with Pubsub processing time.
+ Timestamp timestampProto = pubsubMessage.getPublishTime();
+ long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
+ if (timestampLabel != null && attributes != null) {
+ String timestampString = attributes.get(timestampLabel);
+ if (timestampString != null && !timestampString.isEmpty()) {
+ try {
+ // Try parsing as milliseconds since epoch. Note there is no way to parse a
+ // string in RFC 3339 format here.
+ // Expected IllegalArgumentException if parsing fails; we use that to fall back
+ // to RFC 3339.
+ timestampMsSinceEpoch = Long.parseLong(timestampString);
+ } catch (IllegalArgumentException e1) {
+ try {
+ // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+ // IllegalArgumentException if parsing fails, and the caller should handle.
+ timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
+ } catch (IllegalArgumentException e2) {
+ // Fallback to Pubsub processing time.
+ }
+ }
+ }
+ // else: fallback to Pubsub processing time.
+ }
+ // else: fallback to Pubsub processing time.
+
+ // Ack id.
+ String ackId = message.getAckId();
+ Preconditions.checkState(ackId != null && !ackId.isEmpty());
+
+ // Record id, if any.
+ @Nullable byte[] recordId = null;
+ if (idLabel != null && attributes != null) {
+ String recordIdString = attributes.get(idLabel);
+ if (recordIdString != null && !recordIdString.isEmpty()) {
+ recordId = recordIdString.getBytes();
+ }
+ }
+ if (recordId == null) {
+ recordId = pubsubMessage.getMessageId().getBytes();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds)
+ throws IOException {
+ AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .build();
+ subscriberStub().acknowledge(request); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, Iterable<String> ackIds, int
+ deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ ModifyAckDeadlineRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds)
+ .build();
+ subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ Topic request = Topic.newBuilder()
+ .setName(topic.getPath())
+ .build();
+ publisherStub().createTopic(request); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
+ .setTopic(topic.getPath())
+ .build();
+ publisherStub().deleteTopic(request); // ignore Empty result.
+ }
+
+ @Override
+ public Collection<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsRequest.Builder request =
+ ListTopicsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListTopicsResponse response = publisherStub().listTopics(request.build());
+ if (response.getTopicsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
+ while (true) {
+ for (Topic topic : response.getTopicsList()) {
+ topics.add(new TopicPath(topic.getName()));
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = publisherStub().listTopics(request.build());
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = Subscription.newBuilder()
+ .setTopic(topic.getPath())
+ .setName(subscription.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds)
+ .build();
+ subscriberStub().createSubscription(request); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ DeleteSubscriptionRequest request =
+ DeleteSubscriptionRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .build();
+ subscriberStub().deleteSubscription(request); // ignore Empty result.
+ }
+
+ @Override
+ public Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsRequest.Builder request =
+ ListSubscriptionsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+ if (response.getSubscriptionsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+ while (true) {
+ for (Subscription subscription : response.getSubscriptionsList()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(new SubscriptionPath(subscription.getName()));
+ }
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = subscriberStub().listSubscriptions(request.build());
+ }
+ return subscriptions;
+ }
+}
[2/2] incubator-beam git commit: This closes #120
Posted by ke...@apache.org.
This closes #120
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46c82acf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46c82acf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46c82acf
Branch: refs/heads/master
Commit: 46c82acf5b7f8f8861958d1ae0cdc114b1347f48
Parents: 9f41ddb 06d4709
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Apr 12 11:19:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 12 11:19:41 2016 -0700
----------------------------------------------------------------------
pom.xml | 1 +
sdks/java/core/pom.xml | 34 ++
.../cloud/dataflow/sdk/io/PubsubClient.java | 322 +++++++++++++++
.../cloud/dataflow/sdk/io/PubsubGrpcClient.java | 401 +++++++++++++++++++
4 files changed, 758 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46c82acf/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46c82acf/sdks/java/core/pom.xml
----------------------------------------------------------------------