You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 15:17:01 UTC

[4/8] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
deleted file mode 100644
index 6d4cf4e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.auth.Credentials;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase;
-import io.grpc.ManagedChannel;
-import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubGrpcClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubGrpcClientTest {
-  private ManagedChannel inProcessChannel;
-  private Credentials testCredentials;
-
-  private PubsubClient client;
-  private String channelName;
-
-  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
-  private static final SubscriptionPath SUBSCRIPTION =
-      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
-  private static final long REQ_TIME = 1234L;
-  private static final long PUB_TIME = 3456L;
-  private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
-  private static final String MESSAGE_ID = "testMessageId";
-  private static final String DATA = "testData";
-  private static final String RECORD_ID = "testRecordId";
-  private static final String ACK_ID = "testAckId";
-  private static final Map<String, String> ATTRIBUTES =
-          ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
-
-  @Before
-  public void setup() {
-    channelName = String.format("%s-%s",
-        PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt());
-    inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build();
-    testCredentials = new TestCredential();
-    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials);
-  }
-
-  @After
-  public void teardown() throws IOException {
-    client.close();
-    inProcessChannel.shutdownNow();
-  }
-
-  @Test
-  public void pullOneMessage() throws IOException {
-    String expectedSubscription = SUBSCRIPTION.getPath();
-    final PullRequest expectedRequest =
-        PullRequest.newBuilder()
-                   .setSubscription(expectedSubscription)
-                   .setReturnImmediately(true)
-                   .setMaxMessages(10)
-                   .build();
-    Timestamp timestamp = Timestamp.newBuilder()
-                                   .setSeconds(PUB_TIME / 1000)
-                                   .setNanos((int) (PUB_TIME % 1000) * 1000)
-                                   .build();
-    PubsubMessage expectedPubsubMessage =
-        PubsubMessage.newBuilder()
-                     .setMessageId(MESSAGE_ID)
-                     .setData(
-                         ByteString.copyFrom(DATA.getBytes()))
-                     .setPublishTime(timestamp)
-                     .putAllAttributes(ATTRIBUTES)
-                     .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL,
-                                         String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
-                     .build();
-    ReceivedMessage expectedReceivedMessage =
-        ReceivedMessage.newBuilder()
-                       .setMessage(expectedPubsubMessage)
-                       .setAckId(ACK_ID)
-                       .build();
-    final PullResponse response =
-        PullResponse.newBuilder()
-                    .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
-                    .build();
-
-    final List<PullRequest> requestsReceived = new ArrayList<>();
-    SubscriberImplBase subscriberImplBase = new SubscriberImplBase() {
-      @Override
-      public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
-        requestsReceived.add(request);
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
-      }
-    };
-    Server server = InProcessServerBuilder.forName(channelName)
-        .addService(subscriberImplBase)
-        .build()
-        .start();
-    try {
-      List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
-      assertEquals(1, acutalMessages.size());
-      IncomingMessage actualMessage = acutalMessages.get(0);
-      assertEquals(ACK_ID, actualMessage.ackId);
-      assertEquals(DATA, new String(actualMessage.elementBytes));
-      assertEquals(RECORD_ID, actualMessage.recordId);
-      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
-      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
-      assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
-    } finally {
-      server.shutdownNow();
-    }
-  }
-
-  @Test
-  public void publishOneMessage() throws IOException {
-    String expectedTopic = TOPIC.getPath();
-    PubsubMessage expectedPubsubMessage =
-        PubsubMessage.newBuilder()
-                     .setData(ByteString.copyFrom(DATA.getBytes()))
-                     .putAllAttributes(ATTRIBUTES)
-                     .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
-                     .build();
-    final PublishRequest expectedRequest =
-        PublishRequest.newBuilder()
-                      .setTopic(expectedTopic)
-                      .addAllMessages(
-                          ImmutableList.of(expectedPubsubMessage))
-                      .build();
-    final PublishResponse response =
-        PublishResponse.newBuilder()
-                       .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
-                       .build();
-
-    final List<PublishRequest> requestsReceived = new ArrayList<>();
-    PublisherImplBase publisherImplBase = new PublisherImplBase() {
-      @Override
-      public void publish(
-          PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
-        requestsReceived.add(request);
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
-      }
-    };
-    Server server = InProcessServerBuilder.forName(channelName)
-        .addService(publisherImplBase)
-        .build()
-        .start();
-    try {
-      OutgoingMessage actualMessage = new OutgoingMessage(
-              DATA.getBytes(), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
-      int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
-      assertEquals(1, n);
-      assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
-    } finally {
-      server.shutdownNow();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
deleted file mode 100644
index 019190b..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Tests for PubsubJsonClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubJsonClientTest {
-  private Pubsub mockPubsub;
-  private PubsubClient client;
-
-  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
-  private static final SubscriptionPath SUBSCRIPTION =
-      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
-  private static final long REQ_TIME = 1234L;
-  private static final long PUB_TIME = 3456L;
-  private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
-  private static final String MESSAGE_ID = "testMessageId";
-  private static final String DATA = "testData";
-  private static final String RECORD_ID = "testRecordId";
-  private static final String ACK_ID = "testAckId";
-
-  @Before
-  public void setup() throws IOException {
-    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
-    client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
-  }
-
-  @After
-  public void teardown() throws IOException {
-    client.close();
-    client = null;
-    mockPubsub = null;
-  }
-
-  @Test
-  public void pullOneMessage() throws IOException {
-    String expectedSubscription = SUBSCRIPTION.getPath();
-    PullRequest expectedRequest =
-        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
-    PubsubMessage expectedPubsubMessage = new PubsubMessage()
-        .setMessageId(MESSAGE_ID)
-        .encodeData(DATA.getBytes())
-        .setPublishTime(String.valueOf(PUB_TIME))
-        .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
-    ReceivedMessage expectedReceivedMessage =
-        new ReceivedMessage().setMessage(expectedPubsubMessage)
-                             .setAckId(ACK_ID);
-    PullResponse expectedResponse =
-        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
-    Mockito.when((Object) (mockPubsub.projects()
-                               .subscriptions()
-                               .pull(expectedSubscription, expectedRequest)
-                               .execute()))
-           .thenReturn(expectedResponse);
-    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
-    assertEquals(1, acutalMessages.size());
-    IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId);
-    assertEquals(DATA, new String(actualMessage.elementBytes));
-    assertEquals(RECORD_ID, actualMessage.recordId);
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
-    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
-  }
-
-  @Test
-  public void publishOneMessage() throws IOException {
-    String expectedTopic = TOPIC.getPath();
-    PubsubMessage expectedPubsubMessage = new PubsubMessage()
-        .encodeData(DATA.getBytes())
-        .setAttributes(
-            ImmutableMap.<String, String> builder()
-                    .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
-                    .put(ID_LABEL, RECORD_ID)
-                    .put("k", "v").build());
-    PublishRequest expectedRequest = new PublishRequest()
-        .setMessages(ImmutableList.of(expectedPubsubMessage));
-    PublishResponse expectedResponse = new PublishResponse()
-        .setMessageIds(ImmutableList.of(MESSAGE_ID));
-    Mockito.when((Object) (mockPubsub.projects()
-                                .topics()
-                                .publish(expectedTopic, expectedRequest)
-                                .execute()))
-           .thenReturn(expectedResponse);
-    Map<String, String> attrs = new HashMap<>();
-    attrs.put("k", "v");
-    OutgoingMessage actualMessage = new OutgoingMessage(
-            DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
-    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
-    assertEquals(1, n);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
deleted file mode 100644
index a1b7daf..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubTestClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubTestClientTest {
-  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
-  private static final SubscriptionPath SUBSCRIPTION =
-      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
-  private static final long REQ_TIME = 1234L;
-  private static final long MESSAGE_TIME = 6789L;
-  private static final String MESSAGE_ID = "testMessageId";
-  private static final String DATA = "testData";
-  private static final String ACK_ID = "testAckId";
-  private static final int ACK_TIMEOUT_S = 60;
-
-  @Test
-  public void pullOneMessage() throws IOException {
-    final AtomicLong now = new AtomicLong();
-    Clock clock = new Clock() {
-      @Override
-      public long currentTimeMillis() {
-        return now.get();
-      }
-    };
-    IncomingMessage expectedIncomingMessage =
-        new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
-    try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
-                                                   Lists.newArrayList(expectedIncomingMessage))) {
-      try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
-        now.set(REQ_TIME);
-        client.advance();
-        List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
-        assertEquals(1, incomingMessages.size());
-        assertEquals(expectedIncomingMessage, incomingMessages.get(0));
-        // Timeout on ACK.
-        now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
-        client.advance();
-        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
-        assertEquals(1, incomingMessages.size());
-        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
-        now.addAndGet(10 * 1000);
-        client.advance();
-        // Extend ack
-        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
-        // Timeout on extended ACK
-        now.addAndGet(30 * 1000);
-        client.advance();
-        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
-        assertEquals(1, incomingMessages.size());
-        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
-        // Extend ack
-        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
-        // Ack
-        now.addAndGet(15 * 1000);
-        client.advance();
-        client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
-      }
-    }
-  }
-
-  @Test
-  public void publishOneMessage() throws IOException {
-    OutgoingMessage expectedOutgoingMessage =
-        new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID);
-    try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(
-                 TOPIC,
-                 Sets.newHashSet(expectedOutgoingMessage),
-                 ImmutableList.<OutgoingMessage>of())) {
-      try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
-        client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 1a2e100..d22c6c5 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -84,6 +84,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.api.grpc</groupId>
+      <artifactId>grpc-google-pubsub-v1</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>
@@ -106,10 +116,44 @@
 
     <dependency>
       <groupId>io.grpc</groupId>
+      <artifactId>grpc-auth</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <!-- grpc-all does not obey IWYU, so we need to exclude from compile
+     scope and depend on it at runtime. -->
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
new file mode 100644
index 0000000..750178c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+abstract class PubsubClient implements Closeable {
+  /**
+   * Factory for creating clients.
+   */
+  public interface PubsubClientFactory extends Serializable {
+    /**
+     * Construct a new Pubsub client. It should be closed via {@link #close} in order
+     * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+     * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+     * timestamps/ids within message metadata.
+     */
+    PubsubClient newClient(
+        @Nullable String timestampLabel,
+        @Nullable String idLabel,
+        PubsubOptions options) throws IOException;
+
+    /**
+     * Return the display name for this factory. Eg "Json", "gRPC".
+     */
+    String getKind();
+  }
+
+  /**
+   * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+   * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+   * if timestamp cannot be recognized.
+   */
+  @Nullable
+  private static Long asMsSinceEpoch(@Nullable String timestamp) {
+    if (Strings.isNullOrEmpty(timestamp)) {
+      return null;
+    }
+    try {
+      // Try parsing as milliseconds since epoch. Note there is no way to parse a
+      // string in RFC 3339 format here.
+      // Expected IllegalArgumentException if parsing fails; we use that to fall back
+      // to RFC 3339.
+      return Long.parseLong(timestamp);
+    } catch (IllegalArgumentException e1) {
+      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+      // IllegalArgumentException if parsing fails, and the caller should handle.
+      return DateTime.parseRfc3339(timestamp).getValue();
+    }
+  }
+
+  /**
+   * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+   * attributes} and {@code pubsubTimestamp}.
+   *
+   * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+   * that label, and the value of that label will be taken as the timestamp.
+   * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+   * pubsubTimestamp}.
+   *
+   * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
+   * or RFC3339 time.
+   */
+  protected static long extractTimestamp(
+      @Nullable String timestampLabel,
+      @Nullable String pubsubTimestamp,
+      @Nullable Map<String, String> attributes) {
+    Long timestampMsSinceEpoch;
+    if (Strings.isNullOrEmpty(timestampLabel)) {
+      timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret PubSub publish timestamp: %s",
+                    pubsubTimestamp);
+    } else {
+      String value = attributes == null ? null : attributes.get(timestampLabel);
+      checkArgument(value != null,
+                    "PubSub message is missing a value for timestamp label %s",
+                    timestampLabel);
+      timestampMsSinceEpoch = asMsSinceEpoch(value);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret value of label %s as timestamp: %s",
+                    timestampLabel, value);
+    }
+    return timestampMsSinceEpoch;
+  }
+
+  /**
+   * Path representing a cloud project id.
+   */
+  static class ProjectPath implements Serializable {
+    private final String projectId;
+
+    /**
+     * Creates a {@link ProjectPath} from a {@link String} representation, which
+     * must be of the form {@code "projects/" + projectId}.
+     */
+    ProjectPath(String path) {
+      String[] splits = path.split("/");
+      checkArgument(
+          splits.length == 2 && splits[0].equals("projects"),
+          "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>",
+          path);
+      this.projectId = splits[1];
+    }
+
+    public String getPath() {
+      return String.format("projects/%s", projectId);
+    }
+
+    public String getId() {
+      return projectId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ProjectPath that = (ProjectPath) o;
+
+      return projectId.equals(that.projectId);
+    }
+
+    @Override
+    public int hashCode() {
+      return projectId.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return getPath();
+    }
+  }
+
+  public static ProjectPath projectPathFromPath(String path) {
+    return new ProjectPath(path);
+  }
+
+  public static ProjectPath projectPathFromId(String projectId) {
+    return new ProjectPath(String.format("projects/%s", projectId));
+  }
+
+  /**
+   * Path representing a Pubsub subscription.
+   */
+  public static class SubscriptionPath implements Serializable {
+    private final String projectId;
+    private final String subscriptionName;
+
+    SubscriptionPath(String path) {
+      String[] splits = path.split("/");
+      checkState(
+          splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"),
+          "Malformed subscription path %s: "
+          + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path);
+      this.projectId = splits[1];
+      this.subscriptionName = splits[3];
+    }
+
+    public String getPath() {
+      return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName);
+    }
+
+    public String getName() {
+      return subscriptionName;
+    }
+
+    public String getV1Beta1Path() {
+      return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SubscriptionPath that = (SubscriptionPath) o;
+      return this.subscriptionName.equals(that.subscriptionName)
+          && this.projectId.equals(that.projectId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(projectId, subscriptionName);
+    }
+
+    @Override
+    public String toString() {
+      return getPath();
+    }
+  }
+
+  public static SubscriptionPath subscriptionPathFromPath(String path) {
+    return new SubscriptionPath(path);
+  }
+
+  public static SubscriptionPath subscriptionPathFromName(
+      String projectId, String subscriptionName) {
+    return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+                                              projectId, subscriptionName));
+  }
+
+  /**
+   * Path representing a Pubsub topic.
+   */
+  public static class TopicPath implements Serializable {
+    private final String path;
+
+    TopicPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getName() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return splits[3];
+    }
+
+    public String getV1Beta1Path() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return String.format("/topics/%s/%s", splits[1], splits[3]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TopicPath topicPath = (TopicPath) o;
+      return path.equals(topicPath.path);
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return path;
+    }
+  }
+
+  public static TopicPath topicPathFromPath(String path) {
+    return new TopicPath(path);
+  }
+
+  public static TopicPath topicPathFromName(String projectId, String topicName) {
+    return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+  }
+
+  /**
+   * A message to be sent to Pubsub.
+   *
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  static class OutgoingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    public final Map<String, String> attributes;
+
+    /**
+     * Timestamp for element (ms since epoch).
+     */
+    public final long timestampMsSinceEpoch;
+
+    /**
+     * If using an id label, the record id to associate with this record's metadata so the receiver
+     * can reject duplicates. Otherwise {@literal null}.
+     */
+    @Nullable
+    public final String recordId;
+
+    public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+                           long timestampMsSinceEpoch, @Nullable String recordId) {
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.recordId = recordId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("OutgoingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      OutgoingMessage that = (OutgoingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+              && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes)
+              && Objects.equal(recordId, that.recordId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+              recordId);
+    }
+  }
+
+  /**
+   * A message received from Pubsub.
+   *
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  static class IncomingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    public Map<String, String> attributes;
+
+    /**
+     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+     * or the custom timestamp associated with the message.
+     */
+    public final long timestampMsSinceEpoch;
+
+    /**
+     * Timestamp (in system time) at which we requested the message (ms since epoch).
+     */
+    public final long requestTimeMsSinceEpoch;
+
+    /**
+     * Id to pass back to Pubsub to acknowledge receipt of this message.
+     */
+    public final String ackId;
+
+    /**
+     * Id to pass to the runner to distinguish this message from all others.
+     */
+    public final String recordId;
+
+    public IncomingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
+        long timestampMsSinceEpoch,
+        long requestTimeMsSinceEpoch,
+        String ackId,
+        String recordId) {
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+      this.ackId = ackId;
+      this.recordId = recordId;
+    }
+
+    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+      return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+              requestTimeMsSinceEpoch, ackId, recordId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("IncomingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      IncomingMessage that = (IncomingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+             && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+             && ackId.equals(that.ackId)
+             && recordId.equals(that.recordId)
+             && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+                              requestTimeMsSinceEpoch,
+                              ackId, recordId);
+    }
+  }
+
+  /**
+   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+   * published.
+   */
+  public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException;
+
+  /**
+   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+   * Return the received messages, or empty collection if none were available. Does not
+   * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+   * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+   */
+  public abstract List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately)
+      throws IOException;
+
+  /**
+   * Acknowldege messages from {@code subscription} with {@code ackIds}.
+   */
+  public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+      throws IOException;
+
+  /**
+   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+   * be {@code deadlineSeconds} from now.
+   */
+  public abstract void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds,
+      int deadlineSeconds) throws IOException;
+
+  /**
+   * Create {@code topic}.
+   */
+  public abstract void createTopic(TopicPath topic) throws IOException;
+
+  /*
+   * Delete {@code topic}.
+   */
+  public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+  /**
+   * Return a list of topics for {@code project}.
+   */
+  public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+  /**
+   * Create {@code subscription} to {@code topic}.
+   */
+  public abstract void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+  /**
+   * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+   * is the responsibility of the caller to later delete the subscription.
+   */
+  public SubscriptionPath createRandomSubscription(
+      ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+    // Create a randomized subscription derived from the topic name.
+    String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+    SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName);
+    createSubscription(topic, subscription, ackDeadlineSeconds);
+    return subscription;
+  }
+
+  /**
+   * Delete {@code subscription}.
+   */
+  public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+  /**
+   * Return a list of subscriptions for {@code topic} in {@code project}.
+   */
+  public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException;
+
+  /**
+   * Return the ack deadline, in seconds, for {@code subscription}.
+   */
+  public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+
+  /**
+   * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
+   * will return {@literal false}. Test clients may return {@literal true} to signal that all
+   * expected messages have been pulled and the test may complete.
+   */
+  public abstract boolean isEOF();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
new file mode 100644
index 0000000..912d59c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auth.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A helper class for talking to Pubsub via grpc.
+ *
+ * <p>CAUTION: Currently uses the application default credentials and does not respect any
+ * credentials-related arguments in {@link GcpOptions}.
+ */
+class PubsubGrpcClient extends PubsubClient {
+  private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+  private static final int PUBSUB_PORT = 443;
+  private static final int LIST_BATCH_SIZE = 1000;
+
+  private static final int DEFAULT_TIMEOUT_S = 15;
+
+  private static class PubsubGrpcClientFactory implements PubsubClientFactory {
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        throws IOException {
+      ManagedChannel channel = NettyChannelBuilder
+          .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+          .negotiationType(NegotiationType.TLS)
+          .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+          .build();
+
+      return new PubsubGrpcClient(timestampLabel,
+                                  idLabel,
+                                  DEFAULT_TIMEOUT_S,
+                                  channel,
+                                  options.getGcpCredential());
+    }
+
+    @Override
+    public String getKind() {
+      return "Grpc";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using gRCP transport.
+   */
+  public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
+
+  /**
+   * Timeout for grpc calls (in s).
+   */
+  private final int timeoutSec;
+
+  /**
+   * Underlying netty channel, or {@literal null} if closed.
+   */
+  @Nullable
+  private ManagedChannel publisherChannel;
+
+  /**
+   * Credentials determined from options and environment.
+   */
+  private final Credentials credentials;
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+
+  /**
+   * Cached stubs, or null if not cached.
+   */
+  @Nullable
+  private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+  private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+  @VisibleForTesting
+  PubsubGrpcClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      int timeoutSec,
+      ManagedChannel publisherChannel,
+      Credentials credentials) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.timeoutSec = timeoutSec;
+    this.publisherChannel = publisherChannel;
+    this.credentials = credentials;
+  }
+
+  /**
+   * Gracefully close the underlying netty channel.
+   */
+  @Override
+  public void close() {
+    if (publisherChannel == null) {
+      // Already closed.
+      return;
+    }
+    // Can gc the underlying stubs.
+    cachedPublisherStub = null;
+    cachedSubscriberStub = null;
+    // Mark the client as having been closed before going further
+    // in case we have an exception from the channel.
+    ManagedChannel publisherChannel = this.publisherChannel;
+    this.publisherChannel = null;
+    // Gracefully shutdown the channel.
+    publisherChannel.shutdown();
+    try {
+      publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // Ignore.
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Return channel with interceptor for returning credentials.
+   */
+  private Channel newChannel() throws IOException {
+    checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+    ClientAuthInterceptor interceptor =
+        new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+    return ClientInterceptors.intercept(publisherChannel, interceptor);
+  }
+
+  /**
+   * Return a stub for making a publish request with a timeout.
+   */
+  private PublisherBlockingStub publisherStub() throws IOException {
+    if (cachedPublisherStub == null) {
+      cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+    }
+    return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Return a stub for making a subscribe request with a timeout.
+   */
+  private SubscriberBlockingStub subscriberStub() throws IOException {
+    if (cachedSubscriberStub == null) {
+      cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+    }
+    return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    PublishRequest.Builder request = PublishRequest.newBuilder()
+                                                   .setTopic(topic.getPath());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage.Builder message =
+          PubsubMessage.newBuilder()
+                       .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+      if (outgoingMessage.attributes != null) {
+        message.putAllAttributes(outgoingMessage.attributes);
+      }
+
+      if (timestampLabel != null) {
+        message.getMutableAttributes()
+               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+      }
+
+      request.addMessages(message);
+    }
+
+    PublishResponse response = publisherStub().publish(request.build());
+    return response.getMessageIdsCount();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = PullRequest.newBuilder()
+                                     .setSubscription(subscription.getPath())
+                                     .setReturnImmediately(returnImmediately)
+                                     .setMaxMessages(batchSize)
+                                     .build();
+    PullResponse response = subscriberStub().pull(request);
+    if (response.getReceivedMessagesCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+    for (ReceivedMessage message : response.getReceivedMessagesList()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+      // Timestamp.
+      String pubsubTimestampString = null;
+      Timestamp timestampProto = pubsubMessage.getPublishTime();
+      if (timestampProto != null) {
+        pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
+                                               + timestampProto.getNanos() / 1000L);
+      }
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable String recordId = null;
+      if (idLabel != null && attributes != null) {
+        recordId = attributes.get(idLabel);
+      }
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, recordId));
+    }
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+      throws IOException {
+    AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+                                                   .setSubscription(subscription.getPath())
+                                                   .addAllAckIds(ackIds)
+                                                   .build();
+    subscriberStub().acknowledge(request); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        ModifyAckDeadlineRequest.newBuilder()
+                                .setSubscription(subscription.getPath())
+                                .addAllAckIds(ackIds)
+                                .setAckDeadlineSeconds(deadlineSeconds)
+                                .build();
+    subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    Topic request = Topic.newBuilder()
+                         .setName(topic.getPath())
+                         .build();
+    publisherStub().createTopic(request); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
+                                                   .setTopic(topic.getPath())
+                                                   .build();
+    publisherStub().deleteTopic(request); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsRequest.Builder request =
+        ListTopicsRequest.newBuilder()
+                         .setProject(project.getPath())
+                         .setPageSize(LIST_BATCH_SIZE);
+    ListTopicsResponse response = publisherStub().listTopics(request.build());
+    if (response.getTopicsCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
+    while (true) {
+      for (Topic topic : response.getTopicsList()) {
+        topics.add(topicPathFromPath(topic.getName()));
+      }
+      if (response.getNextPageToken().isEmpty()) {
+        break;
+      }
+      request.setPageToken(response.getNextPageToken());
+      response = publisherStub().listTopics(request.build());
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = Subscription.newBuilder()
+                                       .setTopic(topic.getPath())
+                                       .setName(subscription.getPath())
+                                       .setAckDeadlineSeconds(ackDeadlineSeconds)
+                                       .build();
+    subscriberStub().createSubscription(request); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    DeleteSubscriptionRequest request =
+        DeleteSubscriptionRequest.newBuilder()
+                                 .setSubscription(subscription.getPath())
+                                 .build();
+    subscriberStub().deleteSubscription(request); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException {
+    ListSubscriptionsRequest.Builder request =
+        ListSubscriptionsRequest.newBuilder()
+                                .setProject(project.getPath())
+                                .setPageSize(LIST_BATCH_SIZE);
+    ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+    if (response.getSubscriptionsCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+    while (true) {
+      for (Subscription subscription : response.getSubscriptionsList()) {
+        if (subscription.getTopic().equals(topic.getPath())) {
+          subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+        }
+      }
+      if (response.getNextPageToken().isEmpty()) {
+        break;
+      }
+      request.setPageToken(response.getNextPageToken());
+      response = subscriberStub().listSubscriptions(request.build());
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    GetSubscriptionRequest request =
+        GetSubscriptionRequest.newBuilder()
+                              .setSubscription(subscription.getPath())
+                              .build();
+    Subscription response = subscriberStub().getSubscription(request);
+    return response.getAckDeadlineSeconds();
+  }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
+}