You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 15:23:04 UTC

[1/3] incubator-beam git commit: Closes #213

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3f0eead50 -> 1ef53b17d


Closes #213


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ef53b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ef53b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ef53b17

Branch: refs/heads/master
Commit: 1ef53b17d33016c99aab67ceb3a942e3da92e225
Parents: 3f0eead 2c509a8
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 4 08:22:47 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 08:22:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/PubsubClient.java    | 323 ------------
 .../apache/beam/sdk/io/PubsubGrpcClient.java    | 403 ---------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 252 ++++------
 .../beam/sdk/util/PubsubApiaryClient.java       | 293 +++++++++++
 .../org/apache/beam/sdk/util/PubsubClient.java  | 489 +++++++++++++++++++
 .../apache/beam/sdk/util/PubsubGrpcClient.java  | 433 ++++++++++++++++
 .../apache/beam/sdk/util/PubsubTestClient.java  | 291 +++++++++++
 .../org/apache/beam/sdk/util/Transport.java     |   4 +
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 157 ------
 .../beam/sdk/util/PubsubApiaryClientTest.java   | 134 +++++
 .../apache/beam/sdk/util/PubsubClientTest.java  | 189 +++++++
 .../beam/sdk/util/PubsubGrpcClientTest.java     | 170 +++++++
 .../beam/sdk/util/PubsubTestClientTest.java     |  97 ++++
 13 files changed, 2205 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: [BEAM-53] Add PubsubApiaryClient, PubsubTestClient

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
new file mode 100644
index 0000000..4a47c30
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods.
+ */
+public class PubsubTestClient extends PubsubClient {
+  public static PubsubClientFactory createFactoryForPublish(
+      final TopicPath expectedTopic,
+      final Set<OutgoingMessage> expectedOutgoingMessages) {
+    return new PubsubClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
+      }
+    };
+  }
+
+  public static PubsubClientFactory createFactoryForPull(
+      @Nullable final SubscriptionPath expectedSubscription,
+      final int ackTimeoutSec,
+      @Nullable final List<IncomingMessage> expectedIncomingMessages) {
+    return new PubsubClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
+                                    null, expectedIncomingMessages);
+      }
+    };
+  }
+
+  /**
+   * Only publish calls for this topic are allowed.
+   */
+  @Nullable
+  private TopicPath expectedTopic;
+  /**
+   * Only pull calls for this subscription are allowed.
+   */
+  @Nullable
+  private SubscriptionPath expectedSubscription;
+
+  /**
+   * Timeout to simulate.
+   */
+  private int ackTimeoutSec;
+
+  /**
+   * Messages yet to seen in a {@link #publish} call.
+   */
+  @Nullable
+  private Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+  /**
+   * Messages waiting to be received by a {@link #pull} call.
+   */
+  @Nullable
+  private List<IncomingMessage> remainingPendingIncomingMessages;
+
+  /**
+   * Messages which have been returned from a {@link #pull} call and
+   * not yet ACKed by an {@link #acknowledge} call.
+   */
+  private Map<String, IncomingMessage> pendingAckIncommingMessages;
+
+  /**
+   * When above messages are due to have their ACK deadlines expire.
+   */
+  private Map<String, Long> ackDeadline;
+
+  /**
+   * Current time.
+   */
+  private long nowMsSinceEpoch;
+
+  @VisibleForTesting
+  PubsubTestClient(
+      @Nullable TopicPath expectedTopic,
+      @Nullable SubscriptionPath expectedSubscription,
+      int ackTimeoutSec,
+      @Nullable Set<OutgoingMessage> expectedOutgoingMessages,
+      @Nullable List<IncomingMessage> expectedIncomingMessages) {
+    this.expectedTopic = expectedTopic;
+    this.expectedSubscription = expectedSubscription;
+    this.ackTimeoutSec = ackTimeoutSec;
+
+    this.remainingExpectedOutgoingMessages = expectedOutgoingMessages;
+    this.remainingPendingIncomingMessages = expectedIncomingMessages;
+
+    this.pendingAckIncommingMessages = new HashMap<>();
+    this.ackDeadline = new HashMap<>();
+    this.nowMsSinceEpoch = Long.MIN_VALUE;
+  }
+
+  /**
+   * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring
+   * outstanding ACKs.
+   */
+  public void advanceTo(long newNowMsSinceEpoch) {
+    checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch,
+                  "Cannot advance time backwards from %d to %d", nowMsSinceEpoch,
+                  newNowMsSinceEpoch);
+    nowMsSinceEpoch = newNowMsSinceEpoch;
+    // Any messages who's ACKs timed out are available for re-pulling.
+    Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator();
+    while (deadlineItr.hasNext()) {
+      Map.Entry<String, Long> entry = deadlineItr.next();
+      if (entry.getValue() <= nowMsSinceEpoch) {
+        remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey()));
+        deadlineItr.remove();
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (remainingExpectedOutgoingMessages != null) {
+      checkState(this.remainingExpectedOutgoingMessages.isEmpty(),
+                 "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size());
+      remainingExpectedOutgoingMessages = null;
+    }
+    if (remainingPendingIncomingMessages != null) {
+      checkState(remainingPendingIncomingMessages.isEmpty(),
+                 "Failed to publish %d messages", remainingPendingIncomingMessages.size());
+      checkState(pendingAckIncommingMessages.isEmpty(),
+                 "Failed to ACK %d messages", pendingAckIncommingMessages.size());
+      checkState(ackDeadline.isEmpty(),
+                 "Failed to ACK %d messages", ackDeadline.size());
+      remainingPendingIncomingMessages = null;
+      pendingAckIncommingMessages = null;
+      ackDeadline = null;
+    }
+  }
+
+  @Override
+  public int publish(
+      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+    checkNotNull(expectedTopic, "Missing expected topic");
+    checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages");
+    checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic,
+               expectedTopic);
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage),
+                 "Unexpeced outgoing message %s", outgoingMessage);
+    }
+    return outgoingMessages.size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+      boolean returnImmediately) throws IOException {
+    checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch,
+               "Simulated time %d does not match requset time %d", nowMsSinceEpoch,
+               requestTimeMsSinceEpoch);
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+    checkState(returnImmediately, "PubsubTestClient only supports returning immediately");
+
+    List<IncomingMessage> incomingMessages = new ArrayList<>();
+    Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator();
+    while (pendItr.hasNext()) {
+      IncomingMessage incomingMessage = pendItr.next();
+      pendItr.remove();
+      IncomingMessage incomingMessageWithRequestTime =
+          incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+      incomingMessages.add(incomingMessageWithRequestTime);
+      pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId,
+                                      incomingMessageWithRequestTime);
+      ackDeadline.put(incomingMessageWithRequestTime.ackId,
+                      requestTimeMsSinceEpoch + ackTimeoutSec * 1000);
+      if (incomingMessages.size() >= batchSize) {
+        break;
+      }
+    }
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(
+      SubscriptionPath subscription,
+      List<String> ackIds) throws IOException {
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+    for (String ackId : ackIds) {
+      checkState(ackDeadline.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+      checkState(pendingAckIncommingMessages.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+    }
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+    for (String ackId : ackIds) {
+      checkState(ackDeadline.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+      checkState(pendingAckIncommingMessages.containsKey(ackId),
+                 "No message with ACK id %s is outstanding", ackId);
+      ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000);
+    }
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(
+      ProjectPath project, TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    return ackTimeoutSec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 9082ce3..6daecdb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -105,7 +105,11 @@ public class Transport {
 
   /**
    * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+   *
+   * @deprecated Use an appropriate
+   * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory}
    */
+  @Deprecated
   public static Pubsub.Builder
       newPubsubClient(PubsubOptions options) {
     return new Pubsub.Builder(getTransport(), getJsonFactory(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 1e5bf51..eaf452d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -24,22 +24,13 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
-import com.google.api.services.pubsub.model.PubsubMessage;
-
 import org.joda.time.Duration;
-import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.HashMap;
-
-import javax.annotation.Nullable;
-
 /**
  * Tests for PubsubIO Read and Write transforms.
  */
@@ -90,154 +81,6 @@ public class PubsubIOTest {
         .toString());
   }
 
-  /**
-   * Helper function that creates a {@link PubsubMessage} with the given timestamp registered as
-   * an attribute with the specified label.
-   *
-   * <p>If {@code label} is {@code null}, then the attributes are {@code null}.
-   *
-   * <p>Else, if {@code timestamp} is {@code null}, then attributes are present but have no key for
-   * the label.
-   */
-  private static PubsubMessage messageWithTimestamp(
-      @Nullable String label, @Nullable String timestamp) {
-    PubsubMessage message = new PubsubMessage();
-    if (label == null) {
-      message.setAttributes(null);
-      return message;
-    }
-
-    message.setAttributes(new HashMap<String, String>());
-
-    if (timestamp == null) {
-      return message;
-    }
-
-    message.getAttributes().put(label, timestamp);
-    return message;
-  }
-
-  /**
-   * Helper function that parses the given string to a timestamp through the PubSubIO plumbing.
-   */
-  private static Instant parseTimestamp(@Nullable String timestamp) {
-    PubsubMessage message = messageWithTimestamp("mylabel", timestamp);
-    return PubsubIO.assignMessageTimestamp(message, "mylabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void noTimestampLabelReturnsNow() {
-    final long time = 987654321L;
-    Instant timestamp = PubsubIO.assignMessageTimestamp(
-        messageWithTimestamp(null, null), null, new FixedClock(time));
-
-    assertEquals(new Instant(time), timestamp);
-  }
-
-  @Test
-  public void timestampLabelWithNullAttributesThrowsError() {
-    PubsubMessage message = messageWithTimestamp(null, null);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
-    PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void timestampLabelSetWithMissingAttributeThrowsError() {
-    PubsubMessage message = messageWithTimestamp("notMyLabel", "ignored");
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
-    PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void timestampLabelParsesMillisecondsSinceEpoch() {
-    Long millis = 1446162101123L;
-    assertEquals(new Instant(millis), parseTimestamp(millis.toString()));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Seconds() {
-    String rfc3339 = "2015-10-29T23:41:41Z";
-    assertEquals(Instant.parse(rfc3339), parseTimestamp(rfc3339));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Tenths() {
-    String rfc3339tenths = "2015-10-29T23:41:41.1Z";
-    assertEquals(Instant.parse(rfc3339tenths), parseTimestamp(rfc3339tenths));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Hundredths() {
-    String rfc3339hundredths = "2015-10-29T23:41:41.12Z";
-    assertEquals(Instant.parse(rfc3339hundredths), parseTimestamp(rfc3339hundredths));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Millis() {
-    String rfc3339millis = "2015-10-29T23:41:41.123Z";
-    assertEquals(Instant.parse(rfc3339millis), parseTimestamp(rfc3339millis));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Micros() {
-    String rfc3339micros = "2015-10-29T23:41:41.123456Z";
-    assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
-    // Note: micros part 456/1000 is dropped.
-    assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339MicrosRounding() {
-    String rfc3339micros = "2015-10-29T23:41:41.123999Z";
-    assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
-    // Note: micros part 999/1000 is dropped, not rounded up.
-    assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormatThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("not-a-timestamp");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat2ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("null");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat3ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("2015-10");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithSmallYear() {
-    // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
-    // This is therefore a "small year" until this difference is reconciled.
-    String rfc3339SmallYear = "1582-10-15T01:23:45.123Z";
-    assertEquals(Instant.parse(rfc3339SmallYear), parseTimestamp(rfc3339SmallYear));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithLargeYear() {
-    // Year 9999 in range.
-    String rfc3339LargeYear = "9999-10-29T23:41:41.123999Z";
-    assertEquals(Instant.parse(rfc3339LargeYear), parseTimestamp(rfc3339LargeYear));
-  }
-
-  @Test
-  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    // Year 10000 out of range.
-    parseTimestamp("10000-10-29T23:41:41.123999Z");
-  }
-
   @Test
   public void testReadDisplayData() {
     String topic = "projects/project/topics/topic";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
new file mode 100644
index 0000000..40c31fb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubApiaryClient.
+ */
+public class PubsubApiaryClientTest {
+  private Pubsub mockPubsub;
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String CUSTOM_ID =
+      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockPubsub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .setMessageId(MESSAGE_ID)
+        .encodeData(DATA.getBytes())
+        .setPublishTime(String.valueOf(PUB_TIME))
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, CUSTOM_ID));
+    ReceivedMessage expectedReceivedMessage =
+        new ReceivedMessage().setMessage(expectedPubsubMessage)
+                             .setAckId(ACK_ID);
+    PullResponse expectedResponse =
+        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+    Mockito.when(mockPubsub.projects()
+                           .subscriptions()
+                           .pull(expectedSubscription, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, CUSTOM_ID));
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when(mockPubsub.projects()
+                           .topics()
+                           .publish(expectedTopic, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
new file mode 100644
index 0000000..2250857
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Map;
+
+/**
+ * Tests for helper classes and methods in PubsubClient.
+ */
+public class PubsubClientTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  //
+  // Timestamp handling
+  //
+
+  private long parse(String timestamp) {
+    Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
+    return PubsubClient.extractTimestamp("myLabel", null, map);
+  }
+
+  private void roundTripRfc339(String timestamp) {
+    assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp));
+  }
+
+  private void truncatedRfc339(String timestamp, String truncatedTimestmap) {
+    assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp));
+  }
+
+  @Test
+  public void noTimestampLabelReturnsPubsubPublish() {
+    final long time = 987654321L;
+    long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
+    assertEquals(time, timestamp);
+  }
+
+  @Test
+  public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    PubsubClient.extractTimestamp(null, "not-a-date", null);
+  }
+
+  @Test
+  public void timestampLabelWithNullAttributesThrowsError() {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+    PubsubClient.extractTimestamp("myLabel", null, null);
+  }
+
+  @Test
+  public void timestampLabelSetWithMissingAttributeThrowsError() {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+    Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
+    PubsubClient.extractTimestamp("myLabel", null, map);
+  }
+
+  @Test
+  public void timestampLabelParsesMillisecondsSinceEpoch() {
+    long time = 1446162101123L;
+    Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
+    long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+    assertEquals(time, timestamp);
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Seconds() {
+    roundTripRfc339("2015-10-29T23:41:41Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Tenths() {
+    roundTripRfc339("2015-10-29T23:41:41.1Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Hundredths() {
+    roundTripRfc339("2015-10-29T23:41:41.12Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Millis() {
+    roundTripRfc339("2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Micros() {
+    // Note: micros part 456/1000 is dropped.
+    truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339MicrosRounding() {
+    // Note: micros part 999/1000 is dropped, not rounded up.
+    truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormatThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("not-a-timestamp");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormat2ThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("null");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormat3ThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("2015-10");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339WithSmallYear() {
+    // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
+    // This is therefore a "small year" until this difference is reconciled.
+    roundTripRfc339("1582-10-15T01:23:45.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339WithLargeYear() {
+    // Year 9999 in range.
+    roundTripRfc339("9999-10-29T23:41:41.123999Z");
+  }
+
+  @Test
+  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    // Year 10000 out of range.
+    parse("10000-10-29T23:41:41.123999Z");
+  }
+
+  //
+  // Paths
+  //
+
+  @Test
+  public void projectPathFromIdWellFormed() {
+    ProjectPath path = PubsubClient.projectPathFromId("test");
+    assertEquals("projects/test", path.getPath());
+  }
+
+  @Test
+  public void subscriptionPathFromNameWellFormed() {
+    SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
+    assertEquals("projects/test/subscriptions/something", path.getPath());
+    assertEquals("/subscriptions/test/something", path.getV1Beta1Path());
+  }
+
+  @Test
+  public void topicPathFromNameWellFormed() {
+    TopicPath path = PubsubClient.topicPathFromName("test", "something");
+    assertEquals("projects/test/topics/something", path.getPath());
+    assertEquals("/topics/test/something", path.getV1Beta1Path());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
new file mode 100644
index 0000000..189049c
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+
+import io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubGrpcClient.
+ */
+public class PubsubGrpcClientTest {
+  private ManagedChannel mockChannel;
+  private GoogleCredentials mockCredentials;
+  private PublisherGrpc.PublisherBlockingStub mockPublisherStub;
+  private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub;
+
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String CUSTOM_ID =
+      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockChannel = Mockito.mock(ManagedChannel.class);
+    mockCredentials = Mockito.mock(GoogleCredentials.class);
+    mockPublisherStub =
+        Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+    mockSubscriberStub =
+        Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel,
+                                  mockCredentials, mockPublisherStub, mockSubscriberStub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockChannel = null;
+    mockCredentials = null;
+    mockPublisherStub = null;
+    mockSubscriberStub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        PullRequest.newBuilder()
+                   .setSubscription(expectedSubscription)
+                   .setReturnImmediately(true)
+                   .setMaxMessages(10)
+                   .build();
+    Timestamp timestamp = Timestamp.newBuilder()
+                                   .setSeconds(PUB_TIME / 1000)
+                                   .setNanos((int) (PUB_TIME % 1000) * 1000)
+                                   .build();
+    PubsubMessage expectedPubsubMessage =
+        PubsubMessage.newBuilder()
+                     .setMessageId(MESSAGE_ID)
+                     .setData(
+                         ByteString.copyFrom(DATA.getBytes()))
+                     .setPublishTime(timestamp)
+                     .putAllAttributes(
+                         ImmutableMap.of(TIMESTAMP_LABEL,
+                                         String.valueOf(MESSAGE_TIME),
+                                         ID_LABEL, CUSTOM_ID))
+                     .build();
+    ReceivedMessage expectedReceivedMessage =
+        ReceivedMessage.newBuilder()
+                       .setMessage(expectedPubsubMessage)
+                       .setAckId(ACK_ID)
+                       .build();
+    PullResponse expectedResponse =
+        PullResponse.newBuilder()
+                    .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
+                    .build();
+    Mockito.when(mockSubscriberStub.pull(expectedRequest))
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage =
+        PubsubMessage.newBuilder()
+                     .setData(ByteString.copyFrom(DATA.getBytes()))
+                     .putAllAttributes(
+                         ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                                         ID_LABEL, CUSTOM_ID))
+                     .build();
+    PublishRequest expectedRequest =
+        PublishRequest.newBuilder()
+                      .setTopic(expectedTopic)
+                      .addAllMessages(
+                          ImmutableList.of(expectedPubsubMessage))
+                      .build();
+    PublishResponse expectedResponse =
+        PublishResponse.newBuilder()
+                       .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
+                       .build();
+    Mockito.when(mockPublisherStub.publish(expectedRequest))
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
new file mode 100644
index 0000000..7d8513b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubTestClient.
+ */
+public class PubsubTestClientTest {
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String ACK_ID = "testAckId";
+  private static final int ACK_TIMEOUT_S = 60;
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    IncomingMessage expectedIncomingMessage =
+        new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
+    try (PubsubTestClient client =
+             new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null,
+                                  Lists.newArrayList(expectedIncomingMessage))) {
+      long now = REQ_TIME;
+      client.advanceTo(now);
+      List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+      // Timeout on ACK.
+      now += (ACK_TIMEOUT_S + 10) * 1000;
+      client.advanceTo(now);
+      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+      now += 10 * 1000;
+      client.advanceTo(now);
+      // Extend ack
+      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+      // Timeout on extended ACK
+      now += 30 * 1000;
+      client.advanceTo(now);
+      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+      // Extend ack
+      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+      // Ack
+      now += 15 * 1000;
+      client.advanceTo(now);
+      client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+    }
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    try (PubsubTestClient client =
+             new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S,
+                                  Sets.newHashSet(expectedOutgoingMessage), null)) {
+      client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+    }
+  }
+}



[3/3] incubator-beam git commit: [BEAM-53] Add PubsubApiaryClient, PubsubTestClient

Posted by dh...@apache.org.
[BEAM-53] Add PubsubApiaryClient, PubsubTestClient

* Move PubsubClient and friends out of sdk.io and into sdk.util.
* Add PubsubApiaryClient since gRPC has onerous boot class path
  requirements which I don't wish to inflict upon other runners.
* Add PubsubTestClient in preparation for unit testing
  PubsubUnbounded{Source,Sink}.
* Unit tests for all of above.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c509a85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c509a85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c509a85

Branch: refs/heads/master
Commit: 2c509a85671b425521c97894cced0b190b5ee51c
Parents: 3f0eead
Author: Mark Shields <ma...@google.com>
Authored: Tue Apr 26 18:41:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 08:22:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/PubsubClient.java    | 323 ------------
 .../apache/beam/sdk/io/PubsubGrpcClient.java    | 403 ---------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 252 ++++------
 .../beam/sdk/util/PubsubApiaryClient.java       | 293 +++++++++++
 .../org/apache/beam/sdk/util/PubsubClient.java  | 489 +++++++++++++++++++
 .../apache/beam/sdk/util/PubsubGrpcClient.java  | 433 ++++++++++++++++
 .../apache/beam/sdk/util/PubsubTestClient.java  | 291 +++++++++++
 .../org/apache/beam/sdk/util/Transport.java     |   4 +
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 157 ------
 .../beam/sdk/util/PubsubApiaryClientTest.java   | 134 +++++
 .../apache/beam/sdk/util/PubsubClientTest.java  | 189 +++++++
 .../beam/sdk/util/PubsubGrpcClientTest.java     | 170 +++++++
 .../beam/sdk/util/PubsubTestClientTest.java     |  97 ++++
 13 files changed, 2205 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
deleted file mode 100644
index f92b480..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * A helper interface for talking to Pubsub via an underlying transport.
- */
-public interface PubsubClient extends AutoCloseable {
-  /**
-   * Path representing a cloud project id.
-   */
-  class ProjectPath implements Serializable {
-    private final String path;
-
-    public ProjectPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      ProjectPath that = (ProjectPath) o;
-
-      return path.equals(that.path);
-
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static ProjectPath fromId(String projectId) {
-      return new ProjectPath(String.format("projects/%s", projectId));
-    }
-  }
-
-  /**
-   * Path representing a Pubsub subscription.
-   */
-  class SubscriptionPath implements Serializable {
-    private final String path;
-
-    public SubscriptionPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getV1Beta1Path() {
-      String[] splits = path.split("/");
-      checkState(splits.length == 4);
-      return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SubscriptionPath that = (SubscriptionPath) o;
-      return path.equals(that.path);
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static SubscriptionPath fromName(String projectId, String subscriptionName) {
-      return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
-          projectId, subscriptionName));
-    }
-  }
-
-  /**
-   * Path representing a Pubsub topic.
-   */
-  class TopicPath implements Serializable {
-    private final String path;
-
-    public TopicPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getV1Beta1Path() {
-      String[] splits = path.split("/");
-      checkState(splits.length == 4);
-      return String.format("/topics/%s/%s", splits[1], splits[3]);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TopicPath topicPath = (TopicPath) o;
-      return path.equals(topicPath.path);
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static TopicPath fromName(String projectId, String topicName) {
-      return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
-    }
-  }
-
-  /**
-   * A message to be sent to Pubsub.
-   */
-  class OutgoingMessage {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    /**
-     * Timestamp for element (ms since epoch).
-     */
-    public final long timestampMsSinceEpoch;
-
-    public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
-      this.elementBytes = elementBytes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-    }
-  }
-
-  /**
-   * A message received from Pubsub.
-   */
-  class IncomingMessage {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    /**
-     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
-     * or the custom timestamp associated with the message.
-     */
-    public final long timestampMsSinceEpoch;
-
-    /**
-     * Timestamp (in system time) at which we requested the message (ms since epoch).
-     */
-    public final long requestTimeMsSinceEpoch;
-
-    /**
-     * Id to pass back to Pubsub to acknowledge receipt of this message.
-     */
-    public final String ackId;
-
-    /**
-     * Id to pass to the runner to distinguish this message from all others.
-     */
-    public final byte[] recordId;
-
-    public IncomingMessage(
-        byte[] elementBytes,
-        long timestampMsSinceEpoch,
-        long requestTimeMsSinceEpoch,
-        String ackId,
-        byte[] recordId) {
-      this.elementBytes = elementBytes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
-      this.ackId = ackId;
-      this.recordId = recordId;
-    }
-  }
-
-  /**
-   * Gracefully close the underlying transport.
-   */
-  @Override
-  void close();
-
-
-  /**
-   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
-   * published.
-   *
-   * @throws IOException
-   */
-  int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages) throws IOException;
-
-  /**
-   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
-   * Return the received messages, or empty collection if none were available. Does not
-   * wait for messages to arrive. Returned messages will record heir request time
-   * as {@code requestTimeMsSinceEpoch}.
-   *
-   * @throws IOException
-   */
-  Collection<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize)
-      throws IOException;
-
-  /**
-   * Acknowldege messages from {@code subscription} with {@code ackIds}.
-   *
-   * @throws IOException
-   */
-  void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds) throws IOException;
-
-  /**
-   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
-   * be {@code deadlineSeconds} from now.
-   *
-   * @throws IOException
-   */
-  void modifyAckDeadline(
-      SubscriptionPath subscription, Iterable<String> ackIds,
-      int deadlineSeconds)
-      throws IOException;
-
-  /**
-   * Create {@code topic}.
-   *
-   * @throws IOException
-   */
-  void createTopic(TopicPath topic) throws IOException;
-
-  /*
-   * Delete {@code topic}.
-   *
-   * @throws IOException
-   */
-  void deleteTopic(TopicPath topic) throws IOException;
-
-  /**
-   * Return a list of topics for {@code project}.
-   *
-   * @throws IOException
-   */
-  Collection<TopicPath> listTopics(ProjectPath project) throws IOException;
-
-  /**
-   * Create {@code subscription} to {@code topic}.
-   *
-   * @throws IOException
-   */
-  void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException;
-
-  /**
-   * Delete {@code subscription}.
-   *
-   * @throws IOException
-   */
-  void deleteSubscription(SubscriptionPath subscription) throws IOException;
-
-  /**
-   * Return a list of subscriptions for {@code topic} in {@code project}.
-   *
-   * @throws IOException
-   */
-  Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
deleted file mode 100644
index 66fb61f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import org.apache.beam.sdk.options.GcpOptions;
-
-import com.google.api.client.util.DateTime;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- */
-public class PubsubGrpcClient implements PubsubClient {
-  private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
-  private static final int PUBSUB_PORT = 443;
-  private static final List<String> PUBSUB_SCOPES =
-      Collections.singletonList("https://www.googleapis.com/auth/pubsub");
-  private static final int LIST_BATCH_SIZE = 1000;
-
-  /**
-   * Timeout for grpc calls (in s).
-   */
-  private static final int TIMEOUT_S = 15;
-
-  /**
-   * Underlying netty channel, or {@literal null} if closed.
-   */
-  @Nullable
-  private ManagedChannel publisherChannel;
-
-  /**
-   * Credentials determined from options and environment.
-   */
-  private final GoogleCredentials credentials;
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Cached stubs, or null if not cached.
-   */
-  @Nullable
-  private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
-  private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
-  private PubsubGrpcClient(
-      @Nullable String timestampLabel, @Nullable String idLabel,
-      ManagedChannel publisherChannel, GoogleCredentials credentials) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.publisherChannel = publisherChannel;
-    this.credentials = credentials;
-  }
-
-  /**
-   * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order
-   * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources
-   * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use
-   * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
-   * message metadata.
-   */
-  public static PubsubGrpcClient newClient(
-      @Nullable String timestampLabel, @Nullable String idLabel,
-      GcpOptions options) throws IOException {
-    ManagedChannel channel = NettyChannelBuilder
-        .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
-        .negotiationType(NegotiationType.TLS)
-        .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
-        .build();
-    // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
-    // various command line options. It currently only supports the older
-    // com.google.api.client.auth.oauth2.Credentials.
-    GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
-    return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
-  }
-
-  /**
-   * Gracefully close the underlying netty channel.
-   */
-  @Override
-  public void close() {
-    Preconditions.checkState(publisherChannel != null, "Client has already been closed");
-    publisherChannel.shutdown();
-    try {
-      publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      // Ignore.
-      Thread.currentThread().interrupt();
-    }
-    publisherChannel = null;
-    cachedPublisherStub = null;
-    cachedSubscriberStub = null;
-  }
-
-  /**
-   * Return channel with interceptor for returning credentials.
-   */
-  private Channel newChannel() throws IOException {
-    Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
-    ClientAuthInterceptor interceptor =
-        new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
-    return ClientInterceptors.intercept(publisherChannel, interceptor);
-  }
-
-  /**
-   * Return a stub for making a publish request with a timeout.
-   */
-  private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
-    if (cachedPublisherStub == null) {
-      cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
-    }
-    return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Return a stub for making a subscribe request with a timeout.
-   */
-  private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
-    if (cachedSubscriberStub == null) {
-      cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
-    }
-    return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    PublishRequest.Builder request = PublishRequest.newBuilder()
-                                                   .setTopic(topic.getPath());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage.Builder message =
-          PubsubMessage.newBuilder()
-                       .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
-      if (timestampLabel != null) {
-        message.getMutableAttributes()
-               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null) {
-        message.getMutableAttributes()
-               .put(idLabel,
-                   Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
-      }
-
-      request.addMessages(message);
-    }
-
-    PublishResponse response = publisherStub().publish(request.build());
-    return response.getMessageIdsCount();
-  }
-
-  @Override
-  public Collection<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize) throws IOException {
-    PullRequest request = PullRequest.newBuilder()
-                                     .setSubscription(subscription.getPath())
-                                     .setReturnImmediately(true)
-                                     .setMaxMessages(batchSize)
-                                     .build();
-    PullResponse response = subscriberStub().pull(request);
-    if (response.getReceivedMessagesCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
-    for (ReceivedMessage message : response.getReceivedMessagesList()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
-      // Timestamp.
-      // Start with Pubsub processing time.
-      Timestamp timestampProto = pubsubMessage.getPublishTime();
-      long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
-      if (timestampLabel != null && attributes != null) {
-        String timestampString = attributes.get(timestampLabel);
-        if (timestampString != null && !timestampString.isEmpty()) {
-          try {
-            // Try parsing as milliseconds since epoch. Note there is no way to parse a
-            // string in RFC 3339 format here.
-            // Expected IllegalArgumentException if parsing fails; we use that to fall back
-            // to RFC 3339.
-            timestampMsSinceEpoch = Long.parseLong(timestampString);
-          } catch (IllegalArgumentException e1) {
-            try {
-              // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
-              // IllegalArgumentException if parsing fails, and the caller should handle.
-              timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
-            } catch (IllegalArgumentException e2) {
-              // Fallback to Pubsub processing time.
-            }
-          }
-        }
-        // else: fallback to Pubsub processing time.
-      }
-      // else: fallback to Pubsub processing time.
-
-      // Ack id.
-      String ackId = message.getAckId();
-      Preconditions.checkState(ackId != null && !ackId.isEmpty());
-
-      // Record id, if any.
-      @Nullable byte[] recordId = null;
-      if (idLabel != null && attributes != null) {
-        String recordIdString = attributes.get(idLabel);
-        if (recordIdString != null && !recordIdString.isEmpty()) {
-          recordId = recordIdString.getBytes();
-        }
-      }
-      if (recordId == null) {
-        recordId = pubsubMessage.getMessageId().getBytes();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
-          requestTimeMsSinceEpoch, ackId, recordId));
-    }
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds)
-      throws IOException {
-    AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
-                                                   .setSubscription(subscription.getPath())
-                                                   .addAllAckIds(ackIds)
-                                                   .build();
-    subscriberStub().acknowledge(request); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, Iterable<String> ackIds, int
-      deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        ModifyAckDeadlineRequest.newBuilder()
-                                .setSubscription(subscription.getPath())
-                                .addAllAckIds(ackIds)
-                                .setAckDeadlineSeconds(deadlineSeconds)
-                                .build();
-    subscriberStub().modifyAckDeadline(request); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    Topic request = Topic.newBuilder()
-                         .setName(topic.getPath())
-                         .build();
-    publisherStub().createTopic(request); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
-                                                   .setTopic(topic.getPath())
-                                                   .build();
-    publisherStub().deleteTopic(request); // ignore Empty result.
-  }
-
-  @Override
-  public Collection<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsRequest.Builder request =
-        ListTopicsRequest.newBuilder()
-                         .setProject(project.getPath())
-                         .setPageSize(LIST_BATCH_SIZE);
-    ListTopicsResponse response = publisherStub().listTopics(request.build());
-    if (response.getTopicsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
-    while (true) {
-      for (Topic topic : response.getTopicsList()) {
-        topics.add(new TopicPath(topic.getName()));
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = publisherStub().listTopics(request.build());
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = Subscription.newBuilder()
-                                       .setTopic(topic.getPath())
-                                       .setName(subscription.getPath())
-                                       .setAckDeadlineSeconds(ackDeadlineSeconds)
-                                       .build();
-    subscriberStub().createSubscription(request); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    DeleteSubscriptionRequest request =
-        DeleteSubscriptionRequest.newBuilder()
-                                 .setSubscription(subscription.getPath())
-                                 .build();
-    subscriberStub().deleteSubscription(request); // ignore Empty result.
-  }
-
-  @Override
-  public Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException {
-    ListSubscriptionsRequest.Builder request =
-        ListSubscriptionsRequest.newBuilder()
-                                .setProject(project.getPath())
-                                .setPageSize(LIST_BATCH_SIZE);
-    ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
-    if (response.getSubscriptionsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
-    while (true) {
-      for (Subscription subscription : response.getSubscriptionsList()) {
-        if (subscription.getTopic().equals(topic.getPath())) {
-          subscriptions.add(new SubscriptionPath(subscription.getName()));
-        }
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = subscriberStub().listSubscriptions(request.build());
-    }
-    return subscriptions;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 4646461..fa867c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -17,8 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -33,25 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.PubsubApiaryClient;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 
-import com.google.api.client.util.Clock;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Strings;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -61,13 +53,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
 
 /**
@@ -82,6 +71,9 @@ import javax.annotation.Nullable;
 public class PubsubIO {
   private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
 
+  /** Factory for creating pubsub client to manage transport. */
+  private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY;
+
   /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
   public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
 
@@ -143,48 +135,6 @@ public class PubsubIO {
   }
 
   /**
-   * Returns the {@link Instant} that corresponds to the timestamp in the supplied
-   * {@link PubsubMessage} under the specified {@code ink label}. See
-   * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are
-   * parsed.
-   *
-   * <p>The {@link Clock} parameter is used to virtualize time for testing.
-   *
-   * @throws IllegalArgumentException if the timestamp label is provided, but there is no
-   *     corresponding attribute in the message or the value provided is not a valid timestamp
-   *     string.
-   * @see PubsubIO.Read#timestampLabel(String)
-   */
-  @VisibleForTesting
-  protected static Instant assignMessageTimestamp(
-      PubsubMessage message, @Nullable String label, Clock clock) {
-    if (label == null) {
-      return new Instant(clock.currentTimeMillis());
-    }
-
-    // Extract message attributes, defaulting to empty map if null.
-    Map<String, String> attributes = firstNonNull(
-        message.getAttributes(), ImmutableMap.<String, String>of());
-
-    String timestampStr = attributes.get(label);
-    checkArgument(timestampStr != null && !timestampStr.isEmpty(),
-        "PubSub message is missing a timestamp in label: %s", label);
-
-    long millisSinceEpoch;
-    try {
-      // Try parsing as milliseconds since epoch. Note there is no way to parse a string in
-      // RFC 3339 format here.
-      // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339.
-      millisSinceEpoch = Long.parseLong(timestampStr);
-    } catch (IllegalArgumentException e) {
-      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException
-      // if parsing fails, and the caller should handle.
-      millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue();
-    }
-    return new Instant(millisSinceEpoch);
-  }
-
-  /**
    * Class representing a Cloud Pub/Sub Subscription.
    */
   public static class PubsubSubscription implements Serializable {
@@ -679,8 +629,8 @@ public class PubsubIO {
 
         if (boundedOutput) {
           return input.getPipeline().begin()
-              .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-              .apply(ParDo.of(new PubsubReader())).setCoder(coder);
+                      .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+                      .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder);
         } else {
           return PCollection.<T>createPrimitiveOutputInternal(
                   input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
@@ -740,86 +690,94 @@ public class PubsubIO {
         return maxReadTime;
       }
 
-      private class PubsubReader extends DoFn<Void, T> {
+      /**
+       * Default reader when Pubsub subscription has some form of upper bound.
+       * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming
+       * PubsubUnboundedSource.
+       * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
+       * service.
+       */
+      private class PubsubBoundedReader extends DoFn<Void, T> {
         private static final int DEFAULT_PULL_SIZE = 100;
+        private static final int ACK_TIMEOUT_SEC = 60;
 
         @Override
         public void processElement(ProcessContext c) throws IOException {
-          Pubsub pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
-                  .build();
-
-          String subscription;
-          if (getSubscription() == null) {
-            String topic = getTopic().asPath();
-            String[] split = topic.split("/");
-            subscription =
-                "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
-                + new Random().nextLong();
-            Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
-            try {
-              pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
-            } catch (Exception e) {
-              throw new RuntimeException("Failed to create subscription: ", e);
+          try (PubsubClient pubsubClient =
+                   FACTORY.newClient(timestampLabel, idLabel,
+                                     c.getPipelineOptions().as(PubsubOptions.class))) {
+
+            PubsubClient.SubscriptionPath subscriptionPath;
+            if (getSubscription() == null) {
+              // Create a randomized subscription derived from the topic name.
+              String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong();
+              // The subscription will be registered under this pipeline's project if we know it.
+              // Otherwise we'll fall back to the topic's project.
+              // Note that they don't need to be the same.
+              String project = c.getPipelineOptions().as(PubsubOptions.class).getProject();
+              if (Strings.isNullOrEmpty(project)) {
+                project = getTopic().project;
+              }
+              subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription);
+              TopicPath topicPath =
+                  PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+              try {
+                pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC);
+              } catch (Exception e) {
+                throw new RuntimeException("Failed to create subscription: ", e);
+              }
+            } else {
+              subscriptionPath =
+                  PubsubClient.subscriptionPathFromName(getSubscription().project,
+                                                        getSubscription().subscription);
             }
-          } else {
-            subscription = getSubscription().asPath();
-          }
 
-          Instant endTime = (getMaxReadTime() == null)
-              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
+            Instant endTime = (getMaxReadTime() == null)
+                              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
 
-          List<PubsubMessage> messages = new ArrayList<>();
+            List<IncomingMessage> messages = new ArrayList<>();
 
-          Throwable finallyBlockException = null;
-          try {
-            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
-                && Instant.now().isBefore(endTime)) {
-              PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
-              if (getMaxNumRecords() > 0) {
-                pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
-              } else {
-                pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
-              }
+            Throwable finallyBlockException = null;
+            try {
+              while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
+                     && Instant.now().isBefore(endTime)) {
+                int batchSize = DEFAULT_PULL_SIZE;
+                if (getMaxNumRecords() > 0) {
+                  batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
+                }
 
-              PullResponse pullResponse =
-                  pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
-              List<String> ackIds = new ArrayList<>();
-              if (pullResponse.getReceivedMessages() != null) {
-                for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
-                  messages.add(received.getMessage());
-                  ackIds.add(received.getAckId());
+                List<IncomingMessage> batchMessages =
+                    pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
+                        false);
+                List<String> ackIds = new ArrayList<>();
+                for (IncomingMessage message : batchMessages) {
+                  messages.add(message);
+                  ackIds.add(message.ackId);
+                }
+                if (ackIds.size() != 0) {
+                  pubsubClient.acknowledge(subscriptionPath, ackIds);
                 }
               }
-
-              if (ackIds.size() != 0) {
-                AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
-                pubsubClient.projects()
-                    .subscriptions()
-                    .acknowledge(subscription, ackRequest)
-                    .execute();
+            } catch (IOException e) {
+              throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
+            } finally {
+              if (getSubscription() == null) {
+                try {
+                  pubsubClient.deleteSubscription(subscriptionPath);
+                } catch (Exception e) {
+                  finallyBlockException = e;
+                }
               }
             }
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
-          } finally {
-            if (getTopic() != null) {
-              try {
-                pubsubClient.projects().subscriptions().delete(subscription).execute();
-              } catch (IOException e) {
-                finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
-                LOG.error("Failed to delete subscription: ", e);
-              }
+            if (finallyBlockException != null) {
+              throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
             }
-          }
-          if (finallyBlockException != null) {
-            throw new RuntimeException(finallyBlockException);
-          }
 
-          for (PubsubMessage message : messages) {
-            c.outputWithTimestamp(
-                CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
-                assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM));
+            for (IncomingMessage message : messages) {
+              c.outputWithTimestamp(
+                  CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes),
+                  new Instant(message.timestampMsSinceEpoch));
+            }
           }
         }
       }
@@ -1026,31 +984,28 @@ public class PubsubIO {
         return coder;
       }
 
+      /**
+       * Writer to Pubsub which batches messages.
+       * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
+       * service.
+       */
       private class PubsubWriter extends DoFn<T, Void> {
         private static final int MAX_PUBLISH_BATCH_SIZE = 100;
-        private transient List<PubsubMessage> output;
-        private transient Pubsub pubsubClient;
+        private transient List<OutgoingMessage> output;
+        private transient PubsubClient pubsubClient;
 
         @Override
-        public void startBundle(Context c) {
+        public void startBundle(Context c) throws IOException {
           this.output = new ArrayList<>();
-          this.pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
-                  .build();
+          this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel,
+                                                c.getPipelineOptions().as(PubsubOptions.class));
         }
 
         @Override
         public void processElement(ProcessContext c) throws IOException {
-          PubsubMessage message =
-              new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element()));
-          if (getTimestampLabel() != null) {
-            Map<String, String> attributes = message.getAttributes();
-            if (attributes == null) {
-              attributes = new HashMap<>();
-              message.setAttributes(attributes);
-            }
-            attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis()));
-          }
+          OutgoingMessage message =
+              new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
+                  c.timestamp().getMillis());
           output.add(message);
 
           if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
@@ -1063,13 +1018,16 @@ public class PubsubIO {
           if (!output.isEmpty()) {
             publish();
           }
+          output = null;
+          pubsubClient.close();
+          pubsubClient = null;
         }
 
         private void publish() throws IOException {
-          PublishRequest publishRequest = new PublishRequest().setMessages(output);
-          pubsubClient.projects().topics()
-              .publish(getTopic().asPath(), publishRequest)
-              .execute();
+          int n = pubsubClient.publish(
+              PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+              output);
+          checkState(n == output.size());
           output.clear();
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
new file mode 100644
index 0000000..f0a9096
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A Pubsub client using Apiary.
+ */
+public class PubsubApiaryClient extends PubsubClient {
+
+  public static final PubsubClientFactory FACTORY = new PubsubClientFactory() {
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        throws IOException {
+      Pubsub pubsub = new Builder(
+          Transport.getTransport(),
+          Transport.getJsonFactory(),
+          new ChainingHttpRequestInitializer(
+              options.getGcpCredential(),
+              // Do not log 404. It clutters the output and is possibly even required by the caller.
+              new RetryHttpRequestInitializer(ImmutableList.of(404))))
+          .setRootUrl(options.getPubsubRootUrl())
+          .setApplicationName(options.getAppName())
+          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+          .build();
+      return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
+    }
+  };
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Underlying Apiary client.
+   */
+  private Pubsub pubsub;
+
+  @VisibleForTesting
+  PubsubApiaryClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      Pubsub pubsub) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.pubsub = pubsub;
+  }
+
+  @Override
+  public void close() {
+    // Nothing to close.
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+      Map<String, String> attributes = pubsubMessage.getAttributes();
+      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+        attributes = new TreeMap<>();
+        pubsubMessage.setAttributes(attributes);
+      }
+
+      if (timestampLabel != null) {
+        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null) {
+        attributes.put(idLabel,
+                       Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+      }
+
+      pubsubMessages.add(pubsubMessage);
+    }
+    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+    PublishResponse response = pubsub.projects()
+                                     .topics()
+                                     .publish(topic.getPath(), request)
+                                     .execute();
+    return response.getMessageIds().size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = new PullRequest()
+        .setReturnImmediately(returnImmediately)
+        .setMaxMessages(batchSize);
+    PullResponse response = pubsub.projects()
+                                  .subscriptions()
+                                  .pull(subscription.getPath(), request)
+                                  .execute();
+    if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+    for (ReceivedMessage message : response.getReceivedMessages()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.decodeData();
+
+      // Timestamp.
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable byte[] recordId = null;
+      if (idLabel != null && attributes != null) {
+        String recordIdString = attributes.get(idLabel);
+        if (!Strings.isNullOrEmpty(recordIdString)) {
+          recordId = recordIdString.getBytes();
+        }
+      }
+      if (recordId == null) {
+        recordId = pubsubMessage.getMessageId().getBytes();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, recordId));
+    }
+
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+    pubsub.projects()
+          .subscriptions()
+          .acknowledge(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        new ModifyAckDeadlineRequest().setAckIds(ackIds)
+                                      .setAckDeadlineSeconds(deadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .modifyAckDeadline(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .create(topic.getPath(), new Topic())
+          .execute(); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .delete(topic.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsResponse response = pubsub.projects()
+                                        .topics()
+                                        .list(project.getPath())
+                                        .execute();
+    if (response.getTopics() == null || response.getTopics().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+    for (Topic topic : response.getTopics()) {
+      topics.add(topicPathFromPath(topic.getName()));
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = new Subscription()
+        .setTopic(topic.getPath())
+        .setAckDeadlineSeconds(ackDeadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .create(subscription.getPath(), request)
+          .execute(); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    pubsub.projects()
+          .subscriptions()
+          .delete(subscription.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException {
+    ListSubscriptionsResponse response = pubsub.projects()
+                                               .subscriptions()
+                                               .list(project.getPath())
+                                               .execute();
+    if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+    for (Subscription subscription : response.getSubscriptions()) {
+      if (subscription.getTopic().equals(topic.getPath())) {
+        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+      }
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+    return response.getAckDeadlineSeconds();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
new file mode 100644
index 0000000..a44329d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+public abstract class PubsubClient implements Closeable {
+  /**
+   * Factory for creating clients.
+   */
+  public interface PubsubClientFactory extends Serializable {
+    /**
+     * Construct a new Pubsub client. It should be closed via {@link #close} in order
+     * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+     * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+     * timestamps/ids within message metadata.
+     */
+    PubsubClient newClient(
+        @Nullable String timestampLabel,
+        @Nullable String idLabel,
+        PubsubOptions options) throws IOException;
+  }
+
+  /**
+   * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+   * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+   * if timestamp cannot be recognized.
+   */
+  @Nullable
+  private static Long asMsSinceEpoch(@Nullable String timestamp) {
+    if (Strings.isNullOrEmpty(timestamp)) {
+      return null;
+    }
+    try {
+      // Try parsing as milliseconds since epoch. Note there is no way to parse a
+      // string in RFC 3339 format here.
+      // Expected IllegalArgumentException if parsing fails; we use that to fall back
+      // to RFC 3339.
+      return Long.parseLong(timestamp);
+    } catch (IllegalArgumentException e1) {
+      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+      // IllegalArgumentException if parsing fails, and the caller should handle.
+      return DateTime.parseRfc3339(timestamp).getValue();
+    }
+  }
+
+  /**
+   * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+   * attributes} and {@code pubsubTimestamp}.
+   * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+   * that label, and the value of that label will be taken as the timestamp.
+   * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+   * pubsubTimestamp}. Throw {@link IllegalArgumentException} if the timestamp cannot be
+   * recognized as a ms-since-unix-epoch or RFC3339 time.
+   *
+   * @throws IllegalArgumentException
+   */
+  protected static long extractTimestamp(
+      @Nullable String timestampLabel,
+      @Nullable String pubsubTimestamp,
+      @Nullable Map<String, String> attributes) {
+    Long timestampMsSinceEpoch;
+    if (Strings.isNullOrEmpty(timestampLabel)) {
+      timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret PubSub publish timestamp: %s",
+                    pubsubTimestamp);
+    } else {
+      String value = attributes == null ? null : attributes.get(timestampLabel);
+      checkArgument(value != null,
+                    "PubSub message is missing a value for timestamp label %s",
+                    timestampLabel);
+      timestampMsSinceEpoch = asMsSinceEpoch(value);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret value of label %s as timestamp: %s",
+                    timestampLabel, value);
+    }
+    return timestampMsSinceEpoch;
+  }
+
+  /**
+   * Path representing a cloud project id.
+   */
+  public static class ProjectPath implements Serializable {
+    private final String path;
+
+    ProjectPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ProjectPath that = (ProjectPath) o;
+
+      return path.equals(that.path);
+
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return path;
+    }
+  }
+
+  public static ProjectPath projectPathFromPath(String path) {
+    return new ProjectPath(path);
+  }
+
+  public static ProjectPath projectPathFromId(String projectId) {
+    return new ProjectPath(String.format("projects/%s", projectId));
+  }
+
+  /**
+   * Path representing a Pubsub subscription.
+   */
+  public static class SubscriptionPath implements Serializable {
+    private final String path;
+
+    SubscriptionPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getV1Beta1Path() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed subscription path %s", path);
+      return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SubscriptionPath that = (SubscriptionPath) o;
+      return path.equals(that.path);
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return path;
+    }
+  }
+
+  public static SubscriptionPath subscriptionPathFromPath(String path) {
+      return new SubscriptionPath(path);
+  }
+
+  public static SubscriptionPath subscriptionPathFromName(
+      String projectId, String subscriptionName) {
+    return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+                                              projectId, subscriptionName));
+  }
+
+  /**
+   * Path representing a Pubsub topic.
+   */
+  public static class TopicPath implements Serializable {
+    private final String path;
+
+    TopicPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getV1Beta1Path() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return String.format("/topics/%s/%s", splits[1], splits[3]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TopicPath topicPath = (TopicPath) o;
+      return path.equals(topicPath.path);
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return path;
+    }
+  }
+
+  public static TopicPath topicPathFromPath(String path) {
+    return new TopicPath(path);
+  }
+
+  public static TopicPath topicPathFromName(String projectId, String topicName) {
+    return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+  }
+
+  /**
+   * A message to be sent to Pubsub.
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  public static class OutgoingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    /**
+     * Timestamp for element (ms since epoch).
+     */
+    public final long timestampMsSinceEpoch;
+
+    public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+      this.elementBytes = elementBytes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      OutgoingMessage that = (OutgoingMessage) o;
+
+      if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
+        return false;
+      }
+      return Arrays.equals(elementBytes, that.elementBytes);
+
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch);
+    }
+  }
+
+  /**
+   * A message received from Pubsub.
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  public static class IncomingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    /**
+     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+     * or the custom timestamp associated with the message.
+     */
+    public final long timestampMsSinceEpoch;
+
+    /**
+     * Timestamp (in system time) at which we requested the message (ms since epoch).
+     */
+    public final long requestTimeMsSinceEpoch;
+
+    /**
+     * Id to pass back to Pubsub to acknowledge receipt of this message.
+     */
+    public final String ackId;
+
+    /**
+     * Id to pass to the runner to distinguish this message from all others.
+     */
+    public final byte[] recordId;
+
+    public IncomingMessage(
+        byte[] elementBytes,
+        long timestampMsSinceEpoch,
+        long requestTimeMsSinceEpoch,
+        String ackId,
+        byte[] recordId) {
+      this.elementBytes = elementBytes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+      this.ackId = ackId;
+      this.recordId = recordId;
+    }
+
+    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+      return new IncomingMessage(elementBytes, timestampMsSinceEpoch, requestTimeMsSinceEpoch,
+                                 ackId, recordId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      IncomingMessage that = (IncomingMessage) o;
+
+      if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
+        return false;
+      }
+      if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) {
+        return false;
+      }
+      if (!Arrays.equals(elementBytes, that.elementBytes)) {
+        return false;
+      }
+      if (!ackId.equals(that.ackId)) {
+        return false;
+      }
+      return Arrays.equals(recordId, that.recordId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
+                              requestTimeMsSinceEpoch,
+                              ackId, Arrays.hashCode(recordId));
+    }
+  }
+
+  /**
+   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+   * published.
+   *
+   * @throws IOException
+   */
+  public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException;
+
+  /**
+   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+   * Return the received messages, or empty collection if none were available. Does not
+   * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+   * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+   *
+   * @throws IOException
+   */
+  public abstract List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately)
+      throws IOException;
+
+  /**
+   * Acknowldege messages from {@code subscription} with {@code ackIds}.
+   *
+   * @throws IOException
+   */
+  public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+      throws IOException;
+
+  /**
+   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+   * be {@code deadlineSeconds} from now.
+   *
+   * @throws IOException
+   */
+  public abstract void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds,
+      int deadlineSeconds) throws IOException;
+
+  /**
+   * Create {@code topic}.
+   *
+   * @throws IOException
+   */
+  public abstract void createTopic(TopicPath topic) throws IOException;
+
+  /*
+   * Delete {@code topic}.
+   *
+   * @throws IOException
+   */
+  public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+  /**
+   * Return a list of topics for {@code project}.
+   *
+   * @throws IOException
+   */
+  public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+  /**
+   * Create {@code subscription} to {@code topic}.
+   *
+   * @throws IOException
+   */
+  public abstract void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+  /**
+   * Delete {@code subscription}.
+   *
+   * @throws IOException
+   */
+  public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+  /**
+   * Return a list of subscriptions for {@code topic} in {@code project}.
+   *
+   * @throws IOException
+   */
+  public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException;
+
+  /**
+   * Return the ack deadline, in seconds, for {@code subscription}.
+   *
+   * @throws IOException
+   */
+  public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
new file mode 100644
index 0000000..b3c1b8f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for talking to Pubsub via grpc.
+ */
+public class PubsubGrpcClient extends PubsubClient {
+  private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+  private static final int PUBSUB_PORT = 443;
+  private static final List<String> PUBSUB_SCOPES =
+      Collections.singletonList("https://www.googleapis.com/auth/pubsub");
+  private static final int LIST_BATCH_SIZE = 1000;
+
+  private static final int DEFAULT_TIMEOUT_S = 15;
+
+  public static final PubsubClientFactory FACTORY =
+      new PubsubClientFactory() {
+        @Override
+        public PubsubClient newClient(
+            @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+            throws IOException {
+          ManagedChannel channel = NettyChannelBuilder
+              .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+              .negotiationType(NegotiationType.TLS)
+              .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+              .build();
+          // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+          // various command line options. It currently only supports the older
+          // com.google.api.client.auth.oauth2.Credentials.
+          GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+          return new PubsubGrpcClient(timestampLabel,
+                                      idLabel,
+                                      DEFAULT_TIMEOUT_S,
+                                      channel,
+                                      credentials,
+                                      null /* publisher stub */,
+                                      null /* subscriber stub */);
+        }
+      };
+
+  /**
+   * Timeout for grpc calls (in s).
+   */
+  private final int timeoutSec;
+
+  /**
+   * Underlying netty channel, or {@literal null} if closed.
+   */
+  @Nullable
+  private ManagedChannel publisherChannel;
+
+  /**
+   * Credentials determined from options and environment.
+   */
+  private final GoogleCredentials credentials;
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+
+  /**
+   * Cached stubs, or null if not cached.
+   */
+  @Nullable
+  private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+  private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+  @VisibleForTesting
+  PubsubGrpcClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      int timeoutSec,
+      ManagedChannel publisherChannel,
+      GoogleCredentials credentials,
+      PublisherGrpc.PublisherBlockingStub cachedPublisherStub,
+      SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.timeoutSec = timeoutSec;
+    this.publisherChannel = publisherChannel;
+    this.credentials = credentials;
+    this.cachedPublisherStub = cachedPublisherStub;
+    this.cachedSubscriberStub = cachedSubscriberStub;
+  }
+
+  /**
+   * Gracefully close the underlying netty channel.
+   */
+  @Override
+  public void close() {
+    if (publisherChannel == null) {
+      // Already closed.
+      return;
+    }
+    // Can gc the underlying stubs.
+    cachedPublisherStub = null;
+    cachedSubscriberStub = null;
+    // Mark the client as having been closed before going further
+    // in case we have an exception from the channel.
+    ManagedChannel publisherChannel = this.publisherChannel;
+    this.publisherChannel = null;
+    // Gracefully shutdown the channel.
+    publisherChannel.shutdown();
+    if (timeoutSec > 0) {
+      try {
+        publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        // Ignore.
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Return channel with interceptor for returning credentials.
+   */
+  private Channel newChannel() throws IOException {
+    checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+    ClientAuthInterceptor interceptor =
+        new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+    return ClientInterceptors.intercept(publisherChannel, interceptor);
+  }
+
+  /**
+   * Return a stub for making a publish request with a timeout.
+   */
+  private PublisherBlockingStub publisherStub() throws IOException {
+    if (cachedPublisherStub == null) {
+      cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+    }
+    if (timeoutSec > 0) {
+      return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+    } else {
+      return cachedPublisherStub;
+    }
+  }
+
+  /**
+   * Return a stub for making a subscribe request with a timeout.
+   */
+  private SubscriberBlockingStub subscriberStub() throws IOException {
+    if (cachedSubscriberStub == null) {
+      cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+    }
+    if (timeoutSec > 0) {
+      return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
+    } else {
+      return cachedSubscriberStub;
+    }
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    PublishRequest.Builder request = PublishRequest.newBuilder()
+                                                   .setTopic(topic.getPath());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage.Builder message =
+          PubsubMessage.newBuilder()
+                       .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+      if (timestampLabel != null) {
+        message.getMutableAttributes()
+               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null) {
+        message.getMutableAttributes()
+               .put(idLabel,
+                    Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+      }
+
+      request.addMessages(message);
+    }
+
+    PublishResponse response = publisherStub().publish(request.build());
+    return response.getMessageIdsCount();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = PullRequest.newBuilder()
+                                     .setSubscription(subscription.getPath())
+                                     .setReturnImmediately(returnImmediately)
+                                     .setMaxMessages(batchSize)
+                                     .build();
+    PullResponse response = subscriberStub().pull(request);
+    if (response.getReceivedMessagesCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+    for (ReceivedMessage message : response.getReceivedMessagesList()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+      // Timestamp.
+      String pubsubTimestampString = null;
+      Timestamp timestampProto = pubsubMessage.getPublishTime();
+      if (timestampProto != null) {
+        pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
+                                               + timestampProto.getNanos() / 1000L);
+      }
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable byte[] recordId = null;
+      if (idLabel != null && attributes != null) {
+        String recordIdString = attributes.get(idLabel);
+        if (recordIdString != null && !recordIdString.isEmpty()) {
+          recordId = recordIdString.getBytes();
+        }
+      }
+      if (recordId == null) {
+        recordId = pubsubMessage.getMessageId().getBytes();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, recordId));
+    }
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+      throws IOException {
+    AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+                                                   .setSubscription(subscription.getPath())
+                                                   .addAllAckIds(ackIds)
+                                                   .build();
+    subscriberStub().acknowledge(request); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        ModifyAckDeadlineRequest.newBuilder()
+                                .setSubscription(subscription.getPath())
+                                .addAllAckIds(ackIds)
+                                .setAckDeadlineSeconds(deadlineSeconds)
+                                .build();
+    subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    Topic request = Topic.newBuilder()
+                         .setName(topic.getPath())
+                         .build();
+    publisherStub().createTopic(request); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
+                                                   .setTopic(topic.getPath())
+                                                   .build();
+    publisherStub().deleteTopic(request); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsRequest.Builder request =
+        ListTopicsRequest.newBuilder()
+                         .setProject(project.getPath())
+                         .setPageSize(LIST_BATCH_SIZE);
+    ListTopicsResponse response = publisherStub().listTopics(request.build());
+    if (response.getTopicsCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
+    while (true) {
+      for (Topic topic : response.getTopicsList()) {
+        topics.add(topicPathFromPath(topic.getName()));
+      }
+      if (response.getNextPageToken().isEmpty()) {
+        break;
+      }
+      request.setPageToken(response.getNextPageToken());
+      response = publisherStub().listTopics(request.build());
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = Subscription.newBuilder()
+                                       .setTopic(topic.getPath())
+                                       .setName(subscription.getPath())
+                                       .setAckDeadlineSeconds(ackDeadlineSeconds)
+                                       .build();
+    subscriberStub().createSubscription(request); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    DeleteSubscriptionRequest request =
+        DeleteSubscriptionRequest.newBuilder()
+                                 .setSubscription(subscription.getPath())
+                                 .build();
+    subscriberStub().deleteSubscription(request); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException {
+    ListSubscriptionsRequest.Builder request =
+        ListSubscriptionsRequest.newBuilder()
+                                .setProject(project.getPath())
+                                .setPageSize(LIST_BATCH_SIZE);
+    ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+    if (response.getSubscriptionsCount() == 0) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+    while (true) {
+      for (Subscription subscription : response.getSubscriptionsList()) {
+        if (subscription.getTopic().equals(topic.getPath())) {
+          subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+        }
+      }
+      if (response.getNextPageToken().isEmpty()) {
+        break;
+      }
+      request.setPageToken(response.getNextPageToken());
+      response = subscriberStub().listSubscriptions(request.build());
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    GetSubscriptionRequest request =
+        GetSubscriptionRequest.newBuilder()
+                              .setSubscription(subscription.getPath())
+                              .build();
+    Subscription response = subscriberStub().getSubscription(request);
+    return response.getAckDeadlineSeconds();
+  }
+}