You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 15:17:01 UTC
[4/8] beam git commit: [BEAM-1722] Move PubsubIO into the
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
deleted file mode 100644
index 6d4cf4e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.auth.Credentials;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase;
-import io.grpc.ManagedChannel;
-import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubGrpcClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubGrpcClientTest {
- private ManagedChannel inProcessChannel;
- private Credentials testCredentials;
-
- private PubsubClient client;
- private String channelName;
-
- private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
- private static final SubscriptionPath SUBSCRIPTION =
- PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
- private static final long REQ_TIME = 1234L;
- private static final long PUB_TIME = 3456L;
- private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
- private static final String MESSAGE_ID = "testMessageId";
- private static final String DATA = "testData";
- private static final String RECORD_ID = "testRecordId";
- private static final String ACK_ID = "testAckId";
- private static final Map<String, String> ATTRIBUTES =
- ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
-
- @Before
- public void setup() {
- channelName = String.format("%s-%s",
- PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt());
- inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build();
- testCredentials = new TestCredential();
- client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials);
- }
-
- @After
- public void teardown() throws IOException {
- client.close();
- inProcessChannel.shutdownNow();
- }
-
- @Test
- public void pullOneMessage() throws IOException {
- String expectedSubscription = SUBSCRIPTION.getPath();
- final PullRequest expectedRequest =
- PullRequest.newBuilder()
- .setSubscription(expectedSubscription)
- .setReturnImmediately(true)
- .setMaxMessages(10)
- .build();
- Timestamp timestamp = Timestamp.newBuilder()
- .setSeconds(PUB_TIME / 1000)
- .setNanos((int) (PUB_TIME % 1000) * 1000)
- .build();
- PubsubMessage expectedPubsubMessage =
- PubsubMessage.newBuilder()
- .setMessageId(MESSAGE_ID)
- .setData(
- ByteString.copyFrom(DATA.getBytes()))
- .setPublishTime(timestamp)
- .putAllAttributes(ATTRIBUTES)
- .putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL,
- String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
- .build();
- ReceivedMessage expectedReceivedMessage =
- ReceivedMessage.newBuilder()
- .setMessage(expectedPubsubMessage)
- .setAckId(ACK_ID)
- .build();
- final PullResponse response =
- PullResponse.newBuilder()
- .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
- .build();
-
- final List<PullRequest> requestsReceived = new ArrayList<>();
- SubscriberImplBase subscriberImplBase = new SubscriberImplBase() {
- @Override
- public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
- requestsReceived.add(request);
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- }
- };
- Server server = InProcessServerBuilder.forName(channelName)
- .addService(subscriberImplBase)
- .build()
- .start();
- try {
- List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
- assertEquals(1, acutalMessages.size());
- IncomingMessage actualMessage = acutalMessages.get(0);
- assertEquals(ACK_ID, actualMessage.ackId);
- assertEquals(DATA, new String(actualMessage.elementBytes));
- assertEquals(RECORD_ID, actualMessage.recordId);
- assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
- assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
- assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
- } finally {
- server.shutdownNow();
- }
- }
-
- @Test
- public void publishOneMessage() throws IOException {
- String expectedTopic = TOPIC.getPath();
- PubsubMessage expectedPubsubMessage =
- PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(DATA.getBytes()))
- .putAllAttributes(ATTRIBUTES)
- .putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
- .build();
- final PublishRequest expectedRequest =
- PublishRequest.newBuilder()
- .setTopic(expectedTopic)
- .addAllMessages(
- ImmutableList.of(expectedPubsubMessage))
- .build();
- final PublishResponse response =
- PublishResponse.newBuilder()
- .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
- .build();
-
- final List<PublishRequest> requestsReceived = new ArrayList<>();
- PublisherImplBase publisherImplBase = new PublisherImplBase() {
- @Override
- public void publish(
- PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
- requestsReceived.add(request);
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- }
- };
- Server server = InProcessServerBuilder.forName(channelName)
- .addService(publisherImplBase)
- .build()
- .start();
- try {
- OutgoingMessage actualMessage = new OutgoingMessage(
- DATA.getBytes(), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
- int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
- assertEquals(1, n);
- assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
- } finally {
- server.shutdownNow();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
deleted file mode 100644
index 019190b..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Tests for PubsubJsonClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubJsonClientTest {
- private Pubsub mockPubsub;
- private PubsubClient client;
-
- private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
- private static final SubscriptionPath SUBSCRIPTION =
- PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
- private static final long REQ_TIME = 1234L;
- private static final long PUB_TIME = 3456L;
- private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
- private static final String MESSAGE_ID = "testMessageId";
- private static final String DATA = "testData";
- private static final String RECORD_ID = "testRecordId";
- private static final String ACK_ID = "testAckId";
-
- @Before
- public void setup() throws IOException {
- mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
- client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
- }
-
- @After
- public void teardown() throws IOException {
- client.close();
- client = null;
- mockPubsub = null;
- }
-
- @Test
- public void pullOneMessage() throws IOException {
- String expectedSubscription = SUBSCRIPTION.getPath();
- PullRequest expectedRequest =
- new PullRequest().setReturnImmediately(true).setMaxMessages(10);
- PubsubMessage expectedPubsubMessage = new PubsubMessage()
- .setMessageId(MESSAGE_ID)
- .encodeData(DATA.getBytes())
- .setPublishTime(String.valueOf(PUB_TIME))
- .setAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID));
- ReceivedMessage expectedReceivedMessage =
- new ReceivedMessage().setMessage(expectedPubsubMessage)
- .setAckId(ACK_ID);
- PullResponse expectedResponse =
- new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
- Mockito.when((Object) (mockPubsub.projects()
- .subscriptions()
- .pull(expectedSubscription, expectedRequest)
- .execute()))
- .thenReturn(expectedResponse);
- List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
- assertEquals(1, acutalMessages.size());
- IncomingMessage actualMessage = acutalMessages.get(0);
- assertEquals(ACK_ID, actualMessage.ackId);
- assertEquals(DATA, new String(actualMessage.elementBytes));
- assertEquals(RECORD_ID, actualMessage.recordId);
- assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
- assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
- }
-
- @Test
- public void publishOneMessage() throws IOException {
- String expectedTopic = TOPIC.getPath();
- PubsubMessage expectedPubsubMessage = new PubsubMessage()
- .encodeData(DATA.getBytes())
- .setAttributes(
- ImmutableMap.<String, String> builder()
- .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
- .put(ID_LABEL, RECORD_ID)
- .put("k", "v").build());
- PublishRequest expectedRequest = new PublishRequest()
- .setMessages(ImmutableList.of(expectedPubsubMessage));
- PublishResponse expectedResponse = new PublishResponse()
- .setMessageIds(ImmutableList.of(MESSAGE_ID));
- Mockito.when((Object) (mockPubsub.projects()
- .topics()
- .publish(expectedTopic, expectedRequest)
- .execute()))
- .thenReturn(expectedResponse);
- Map<String, String> attrs = new HashMap<>();
- attrs.put("k", "v");
- OutgoingMessage actualMessage = new OutgoingMessage(
- DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
- int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
- assertEquals(1, n);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
deleted file mode 100644
index a1b7daf..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubTestClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubTestClientTest {
- private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
- private static final SubscriptionPath SUBSCRIPTION =
- PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
- private static final long REQ_TIME = 1234L;
- private static final long MESSAGE_TIME = 6789L;
- private static final String MESSAGE_ID = "testMessageId";
- private static final String DATA = "testData";
- private static final String ACK_ID = "testAckId";
- private static final int ACK_TIMEOUT_S = 60;
-
- @Test
- public void pullOneMessage() throws IOException {
- final AtomicLong now = new AtomicLong();
- Clock clock = new Clock() {
- @Override
- public long currentTimeMillis() {
- return now.get();
- }
- };
- IncomingMessage expectedIncomingMessage =
- new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
- try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
- Lists.newArrayList(expectedIncomingMessage))) {
- try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
- now.set(REQ_TIME);
- client.advance();
- List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage, incomingMessages.get(0));
- // Timeout on ACK.
- now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
- client.advance();
- incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
- now.addAndGet(10 * 1000);
- client.advance();
- // Extend ack
- client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
- // Timeout on extended ACK
- now.addAndGet(30 * 1000);
- client.advance();
- incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
- // Extend ack
- client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
- // Ack
- now.addAndGet(15 * 1000);
- client.advance();
- client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
- }
- }
- }
-
- @Test
- public void publishOneMessage() throws IOException {
- OutgoingMessage expectedOutgoingMessage =
- new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID);
- try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(
- TOPIC,
- Sets.newHashSet(expectedOutgoingMessage),
- ImmutableList.<OutgoingMessage>of())) {
- try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
- client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 1a2e100..d22c6c5 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -84,6 +84,16 @@
</dependency>
<dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.api.grpc</groupId>
+ <artifactId>grpc-google-pubsub-v1</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
@@ -106,10 +116,44 @@
<dependency>
<groupId>io.grpc</groupId>
+ <artifactId>grpc-auth</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <!-- grpc-all does not obey IWYU, so we need to exclude from compile
+ scope and depend on it at runtime. -->
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
new file mode 100644
index 0000000..750178c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+abstract class PubsubClient implements Closeable {
+ /**
+ * Factory for creating clients.
+ */
+ public interface PubsubClientFactory extends Serializable {
+ /**
+ * Construct a new Pubsub client. It should be closed via {@link #close} in order
+ * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+ * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+ * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+ * timestamps/ids within message metadata.
+ */
+ PubsubClient newClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ PubsubOptions options) throws IOException;
+
+ /**
+ * Return the display name for this factory. Eg "Json", "gRPC".
+ */
+ String getKind();
+ }
+
+ /**
+ * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+ * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+ * if timestamp cannot be recognized.
+ */
+ @Nullable
+ private static Long asMsSinceEpoch(@Nullable String timestamp) {
+ if (Strings.isNullOrEmpty(timestamp)) {
+ return null;
+ }
+ try {
+ // Try parsing as milliseconds since epoch. Note there is no way to parse a
+ // string in RFC 3339 format here.
+ // Expected IllegalArgumentException if parsing fails; we use that to fall back
+ // to RFC 3339.
+ return Long.parseLong(timestamp);
+ } catch (IllegalArgumentException e1) {
+ // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+ // IllegalArgumentException if parsing fails, and the caller should handle.
+ return DateTime.parseRfc3339(timestamp).getValue();
+ }
+ }
+
+ /**
+ * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+ * attributes} and {@code pubsubTimestamp}.
+ *
+ * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+ * that label, and the value of that label will be taken as the timestamp.
+ * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+ * pubsubTimestamp}.
+ *
+ * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
+ * or RFC3339 time.
+ */
+ protected static long extractTimestamp(
+ @Nullable String timestampLabel,
+ @Nullable String pubsubTimestamp,
+ @Nullable Map<String, String> attributes) {
+ Long timestampMsSinceEpoch;
+ if (Strings.isNullOrEmpty(timestampLabel)) {
+ timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret PubSub publish timestamp: %s",
+ pubsubTimestamp);
+ } else {
+ String value = attributes == null ? null : attributes.get(timestampLabel);
+ checkArgument(value != null,
+ "PubSub message is missing a value for timestamp label %s",
+ timestampLabel);
+ timestampMsSinceEpoch = asMsSinceEpoch(value);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret value of label %s as timestamp: %s",
+ timestampLabel, value);
+ }
+ return timestampMsSinceEpoch;
+ }
+
+ /**
+ * Path representing a cloud project id.
+ */
+ static class ProjectPath implements Serializable {
+ private final String projectId;
+
+ /**
+ * Creates a {@link ProjectPath} from a {@link String} representation, which
+ * must be of the form {@code "projects/" + projectId}.
+ */
+ ProjectPath(String path) {
+ String[] splits = path.split("/");
+ checkArgument(
+ splits.length == 2 && splits[0].equals("projects"),
+ "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>",
+ path);
+ this.projectId = splits[1];
+ }
+
+ public String getPath() {
+ return String.format("projects/%s", projectId);
+ }
+
+ public String getId() {
+ return projectId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProjectPath that = (ProjectPath) o;
+
+ return projectId.equals(that.projectId);
+ }
+
+ @Override
+ public int hashCode() {
+ return projectId.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+ }
+
+ public static ProjectPath projectPathFromPath(String path) {
+ return new ProjectPath(path);
+ }
+
+ public static ProjectPath projectPathFromId(String projectId) {
+ return new ProjectPath(String.format("projects/%s", projectId));
+ }
+
+ /**
+ * Path representing a Pubsub subscription.
+ */
+ public static class SubscriptionPath implements Serializable {
+ private final String projectId;
+ private final String subscriptionName;
+
+ SubscriptionPath(String path) {
+ String[] splits = path.split("/");
+ checkState(
+ splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"),
+ "Malformed subscription path %s: "
+ + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path);
+ this.projectId = splits[1];
+ this.subscriptionName = splits[3];
+ }
+
+ public String getPath() {
+ return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName);
+ }
+
+ public String getName() {
+ return subscriptionName;
+ }
+
+ public String getV1Beta1Path() {
+ return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubscriptionPath that = (SubscriptionPath) o;
+ return this.subscriptionName.equals(that.subscriptionName)
+ && this.projectId.equals(that.projectId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(projectId, subscriptionName);
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+ }
+
+ public static SubscriptionPath subscriptionPathFromPath(String path) {
+ return new SubscriptionPath(path);
+ }
+
+ public static SubscriptionPath subscriptionPathFromName(
+ String projectId, String subscriptionName) {
+ return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+ projectId, subscriptionName));
+ }
+
+ /**
+ * Path representing a Pubsub topic.
+ */
+ public static class TopicPath implements Serializable {
+ private final String path;
+
+ TopicPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getName() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return splits[3];
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return String.format("/topics/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicPath topicPath = (TopicPath) o;
+ return path.equals(topicPath.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+ public static TopicPath topicPathFromPath(String path) {
+ return new TopicPath(path);
+ }
+
+ public static TopicPath topicPathFromName(String projectId, String topicName) {
+ return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+ }
+
+ /**
+ * A message to be sent to Pubsub.
+ *
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ static class OutgoingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ public final Map<String, String> attributes;
+
+ /**
+ * Timestamp for element (ms since epoch).
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * If using an id label, the record id to associate with this record's metadata so the receiver
+ * can reject duplicates. Otherwise {@literal null}.
+ */
+ @Nullable
+ public final String recordId;
+
+ public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+ long timestampMsSinceEpoch, @Nullable String recordId) {
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.recordId = recordId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("OutgoingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OutgoingMessage that = (OutgoingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes)
+ && Objects.equal(recordId, that.recordId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+ recordId);
+ }
+ }
+
+ /**
+ * A message received from Pubsub.
+ *
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ static class IncomingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ public Map<String, String> attributes;
+
+ /**
+ * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+ * or the custom timestamp associated with the message.
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * Timestamp (in system time) at which we requested the message (ms since epoch).
+ */
+ public final long requestTimeMsSinceEpoch;
+
+ /**
+ * Id to pass back to Pubsub to acknowledge receipt of this message.
+ */
+ public final String ackId;
+
+ /**
+ * Id to pass to the runner to distinguish this message from all others.
+ */
+ public final String recordId;
+
+ public IncomingMessage(
+ byte[] elementBytes,
+ Map<String, String> attributes,
+ long timestampMsSinceEpoch,
+ long requestTimeMsSinceEpoch,
+ String ackId,
+ String recordId) {
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+
+ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+ return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("IncomingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncomingMessage that = (IncomingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+ && ackId.equals(that.ackId)
+ && recordId.equals(that.recordId)
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId, recordId);
+ }
+ }
+
+ /**
+ * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+ * published.
+ */
+ public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException;
+
+ /**
+ * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+ * Return the received messages, or empty collection if none were available. Does not
+ * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+ * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+ */
+ public abstract List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately)
+ throws IOException;
+
+ /**
+ * Acknowldege messages from {@code subscription} with {@code ackIds}.
+ */
+ public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+ throws IOException;
+
+ /**
+ * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+ * be {@code deadlineSeconds} from now.
+ */
+ public abstract void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds,
+ int deadlineSeconds) throws IOException;
+
+ /**
+ * Create {@code topic}.
+ */
+ public abstract void createTopic(TopicPath topic) throws IOException;
+
+ /*
+ * Delete {@code topic}.
+ */
+ public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+ /**
+ * Return a list of topics for {@code project}.
+ */
+ public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+ /**
+ * Create {@code subscription} to {@code topic}.
+ */
+ public abstract void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+ /**
+ * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+ * is the responsibility of the caller to later delete the subscription.
+ */
+ public SubscriptionPath createRandomSubscription(
+ ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+ // Create a randomized subscription derived from the topic name.
+ String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+ SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName);
+ createSubscription(topic, subscription, ackDeadlineSeconds);
+ return subscription;
+ }
+
+ /**
+ * Delete {@code subscription}.
+ */
+ public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return a list of subscriptions for {@code topic} in {@code project}.
+ */
+ public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException;
+
+ /**
+ * Return the ack deadline, in seconds, for {@code subscription}.
+ */
+ public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
+ * will return {@literal false}. Test clients may return {@literal true} to signal that all
+ * expected messages have been pulled and the test may complete.
+ */
+ public abstract boolean isEOF();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
new file mode 100644
index 0000000..912d59c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auth.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A helper class for talking to Pubsub via grpc.
+ *
+ * <p>CAUTION: Currently uses the application default credentials and does not respect any
+ * credentials-related arguments in {@link GcpOptions}.
+ */
+class PubsubGrpcClient extends PubsubClient {
+ private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+ private static final int PUBSUB_PORT = 443;
+ private static final int LIST_BATCH_SIZE = 1000;
+
+ private static final int DEFAULT_TIMEOUT_S = 15;
+
+ private static class PubsubGrpcClientFactory implements PubsubClientFactory {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ ManagedChannel channel = NettyChannelBuilder
+ .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+
+ return new PubsubGrpcClient(timestampLabel,
+ idLabel,
+ DEFAULT_TIMEOUT_S,
+ channel,
+ options.getGcpCredential());
+ }
+
+ @Override
+ public String getKind() {
+ return "Grpc";
+ }
+ }
+
+ /**
+ * Factory for creating Pubsub clients using gRCP transport.
+ */
+ public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
+
+ /**
+ * Timeout for grpc calls (in s).
+ */
+ private final int timeoutSec;
+
+ /**
+ * Underlying netty channel, or {@literal null} if closed.
+ */
+ @Nullable
+ private ManagedChannel publisherChannel;
+
+ /**
+ * Credentials determined from options and environment.
+ */
+ private final Credentials credentials;
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+
+ /**
+ * Cached stubs, or null if not cached.
+ */
+ @Nullable
+ private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+ private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+ @VisibleForTesting
+ PubsubGrpcClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ int timeoutSec,
+ ManagedChannel publisherChannel,
+ Credentials credentials) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.timeoutSec = timeoutSec;
+ this.publisherChannel = publisherChannel;
+ this.credentials = credentials;
+ }
+
+ /**
+ * Gracefully close the underlying netty channel.
+ */
+ @Override
+ public void close() {
+ if (publisherChannel == null) {
+ // Already closed.
+ return;
+ }
+ // Can gc the underlying stubs.
+ cachedPublisherStub = null;
+ cachedSubscriberStub = null;
+ // Mark the client as having been closed before going further
+ // in case we have an exception from the channel.
+ ManagedChannel publisherChannel = this.publisherChannel;
+ this.publisherChannel = null;
+ // Gracefully shutdown the channel.
+ publisherChannel.shutdown();
+ try {
+ publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Return channel with interceptor for returning credentials.
+ */
+ private Channel newChannel() throws IOException {
+ checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+ ClientAuthInterceptor interceptor =
+ new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+ return ClientInterceptors.intercept(publisherChannel, interceptor);
+ }
+
+ /**
+ * Return a stub for making a publish request with a timeout.
+ */
+ private PublisherBlockingStub publisherStub() throws IOException {
+ if (cachedPublisherStub == null) {
+ cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+ }
+ return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Return a stub for making a subscribe request with a timeout.
+ */
+ private SubscriberBlockingStub subscriberStub() throws IOException {
+ if (cachedSubscriberStub == null) {
+ cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+ }
+ return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ PublishRequest.Builder request = PublishRequest.newBuilder()
+ .setTopic(topic.getPath());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage.Builder message =
+ PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+ if (outgoingMessage.attributes != null) {
+ message.putAllAttributes(outgoingMessage.attributes);
+ }
+
+ if (timestampLabel != null) {
+ message.getMutableAttributes()
+ .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+ }
+
+ request.addMessages(message);
+ }
+
+ PublishResponse response = publisherStub().publish(request.build());
+ return response.getMessageIdsCount();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately) throws IOException {
+ PullRequest request = PullRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .setReturnImmediately(returnImmediately)
+ .setMaxMessages(batchSize)
+ .build();
+ PullResponse response = subscriberStub().pull(request);
+ if (response.getReceivedMessagesCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+ for (ReceivedMessage message : response.getReceivedMessagesList()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+ // Timestamp.
+ String pubsubTimestampString = null;
+ Timestamp timestampProto = pubsubMessage.getPublishTime();
+ if (timestampProto != null) {
+ pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
+ + timestampProto.getNanos() / 1000L);
+ }
+ long timestampMsSinceEpoch =
+ extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+
+ // Ack id.
+ String ackId = message.getAckId();
+ checkState(!Strings.isNullOrEmpty(ackId));
+
+ // Record id, if any.
+ @Nullable String recordId = null;
+ if (idLabel != null && attributes != null) {
+ recordId = attributes.get(idLabel);
+ }
+ if (Strings.isNullOrEmpty(recordId)) {
+ // Fall back to the Pubsub provided message id.
+ recordId = pubsubMessage.getMessageId();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+ throws IOException {
+ AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .build();
+ subscriberStub().acknowledge(request); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ ModifyAckDeadlineRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .addAllAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds)
+ .build();
+ subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ Topic request = Topic.newBuilder()
+ .setName(topic.getPath())
+ .build();
+ publisherStub().createTopic(request); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
+ .setTopic(topic.getPath())
+ .build();
+ publisherStub().deleteTopic(request); // ignore Empty result.
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsRequest.Builder request =
+ ListTopicsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListTopicsResponse response = publisherStub().listTopics(request.build());
+ if (response.getTopicsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
+ while (true) {
+ for (Topic topic : response.getTopicsList()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = publisherStub().listTopics(request.build());
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = Subscription.newBuilder()
+ .setTopic(topic.getPath())
+ .setName(subscription.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds)
+ .build();
+ subscriberStub().createSubscription(request); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ DeleteSubscriptionRequest request =
+ DeleteSubscriptionRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .build();
+ subscriberStub().deleteSubscription(request); // ignore Empty result.
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsRequest.Builder request =
+ ListSubscriptionsRequest.newBuilder()
+ .setProject(project.getPath())
+ .setPageSize(LIST_BATCH_SIZE);
+ ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+ if (response.getSubscriptionsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+ while (true) {
+ for (Subscription subscription : response.getSubscriptionsList()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = subscriberStub().listSubscriptions(request.build());
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ GetSubscriptionRequest request =
+ GetSubscriptionRequest.newBuilder()
+ .setSubscription(subscription.getPath())
+ .build();
+ Subscription response = subscriberStub().getSubscription(request);
+ return response.getAckDeadlineSeconds();
+ }
+
+ @Override
+ public boolean isEOF() {
+ return false;
+ }
+}