You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 15:23:05 UTC

[2/3] incubator-beam git commit: [BEAM-53] Add PubsubApiaryClient, PubsubTestClient

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
new file mode 100644
index 0000000..4a47c30
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods.
+ */
+public class PubsubTestClient extends PubsubClient {
+  public static PubsubClientFactory createFactoryForPublish(
+      final TopicPath expectedTopic,
+      final Set<OutgoingMessage> expectedOutgoingMessages) {
+    return new PubsubClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
+      }
+    };
+  }
+
+  public static PubsubClientFactory createFactoryForPull(
+      @Nullable final SubscriptionPath expectedSubscription,
+      final int ackTimeoutSec,
+      @Nullable final List<IncomingMessage> expectedIncomingMessages) {
+    return new PubsubClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
+                                    null, expectedIncomingMessages);
+      }
+    };
+  }
+
+  /**
+   * Only publish calls for this topic are allowed.
+   */
+  @Nullable
+  private TopicPath expectedTopic;
+  /**
+   * Only pull calls for this subscription are allowed.
+   */
+  @Nullable
+  private SubscriptionPath expectedSubscription;
+
+  /**
+   * Timeout to simulate.
+   */
+  private int ackTimeoutSec;
+
+  /**
+   * Messages yet to seen in a {@link #publish} call.
+   */
+  @Nullable
+  private Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+  /**
+   * Messages waiting to be received by a {@link #pull} call.
+   */
+  @Nullable
+  private List<IncomingMessage> remainingPendingIncomingMessages;
+
+  /**
+   * Messages which have been returned from a {@link #pull} call and
+   * not yet ACKed by an {@link #acknowledge} call.
+   */
+  private Map<String, IncomingMessage> pendingAckIncommingMessages;
+
+  /**
+   * When above messages are due to have their ACK deadlines expire.
+   */
+  private Map<String, Long> ackDeadline;
+
+  /**
+   * Current time.
+   */
+  private long nowMsSinceEpoch;
+
+  @VisibleForTesting
+  PubsubTestClient(
+      @Nullable TopicPath expectedTopic,
+      @Nullable SubscriptionPath expectedSubscription,
+      int ackTimeoutSec,
+      @Nullable Set<OutgoingMessage> expectedOutgoingMessages,
+      @Nullable List<IncomingMessage> expectedIncomingMessages) {
+    this.expectedTopic = expectedTopic;
+    this.expectedSubscription = expectedSubscription;
+    this.ackTimeoutSec = ackTimeoutSec;
+
+    this.remainingExpectedOutgoingMessages = expectedOutgoingMessages;
+    this.remainingPendingIncomingMessages = expectedIncomingMessages;
+
+    this.pendingAckIncommingMessages = new HashMap<>();
+    this.ackDeadline = new HashMap<>();
+    this.nowMsSinceEpoch = Long.MIN_VALUE;
+  }
+
+  /**
+   * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring
+   * outstanding ACKs.
+   */
+  public void advanceTo(long newNowMsSinceEpoch) {
+    checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch,
+                  "Cannot advance time backwards from %d to %d", nowMsSinceEpoch,
+                  newNowMsSinceEpoch);
+    nowMsSinceEpoch = newNowMsSinceEpoch;
+    // Any messages who's ACKs timed out are available for re-pulling.
+    Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator();
+    while (deadlineItr.hasNext()) {
+      Map.Entry<String, Long> entry = deadlineItr.next();
+      if (entry.getValue() <= nowMsSinceEpoch) {
+        remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey()));
+        deadlineItr.remove();
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (remainingExpectedOutgoingMessages != null) {
+      checkState(this.remainingExpectedOutgoingMessages.isEmpty(),
+                 "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size());
+      remainingExpectedOutgoingMessages = null;
+    }
+    if (remainingPendingIncomingMessages != null) {
+      checkState(remainingPendingIncomingMessages.isEmpty(),
+                 "Failed to publish %d messages", remainingPendingIncomingMessages.size());
+      checkState(pendingAckIncommingMessages.isEmpty(),
+                 "Failed to ACK %d messages", pendingAckIncommingMessages.size());
+      checkState(ackDeadline.isEmpty(),
+                 "Failed to ACK %d messages", ackDeadline.size());
+      remainingPendingIncomingMessages = null;
+      pendingAckIncommingMessages = null;
+      ackDeadline = null;
+    }
+  }
+
+  @Override
+  public int publish(
+      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+    checkNotNull(expectedTopic, "Missing expected topic");
+    checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages");
+    checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic,
+               expectedTopic);
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage),
+                 "Unexpeced outgoing message %s", outgoingMessage);
+    }
+    return outgoingMessages.size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+      boolean returnImmediately) throws IOException {
+    checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch,
+               "Simulated time %d does not match requset time %d", nowMsSinceEpoch,
+               requestTimeMsSinceEpoch);
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+    checkState(returnImmediately, "PubsubTestClient only supports returning immediately");
+
+    List<IncomingMessage> incomingMessages = new ArrayList<>();
+    Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator();
+    while (pendItr.hasNext()) {
+      IncomingMessage incomingMessage = pendItr.next();
+      pendItr.remove();
+      IncomingMessage incomingMessageWithRequestTime =
+          incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+      incomingMessages.add(incomingMessageWithRequestTime);
+      pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId,
+                                      incomingMessageWithRequestTime);
+      ackDeadline.put(incomingMessageWithRequestTime.ackId,
+                      requestTimeMsSinceEpoch + ackTimeoutSec * 1000);
+      if (incomingMessages.size() >= batchSize) {
+        break;
+      }
+    }
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(
+      SubscriptionPath subscription,
+      List<String> ackIds) throws IOException {
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+    for (String ackId : ackIds) {
+      checkState(ackDeadline.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+      checkState(pendingAckIncommingMessages.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+    }
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+    checkNotNull(expectedSubscription, "Missing expected subscription");
+    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+    checkState(subscription.equals(expectedSubscription),
+               "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+    for (String ackId : ackIds) {
+      checkState(ackDeadline.remove(ackId) != null,
+                 "No message with ACK id %s is outstanding", ackId);
+      checkState(pendingAckIncommingMessages.containsKey(ackId),
+                 "No message with ACK id %s is outstanding", ackId);
+      ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000);
+    }
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(
+      ProjectPath project, TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    return ackTimeoutSec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 9082ce3..6daecdb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -105,7 +105,11 @@ public class Transport {
 
   /**
    * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+   *
+   * @deprecated Use an appropriate
+   * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory}
    */
+  @Deprecated
   public static Pubsub.Builder
       newPubsubClient(PubsubOptions options) {
     return new Pubsub.Builder(getTransport(), getJsonFactory(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 1e5bf51..eaf452d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -24,22 +24,13 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
-import com.google.api.services.pubsub.model.PubsubMessage;
-
 import org.joda.time.Duration;
-import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.HashMap;
-
-import javax.annotation.Nullable;
-
 /**
  * Tests for PubsubIO Read and Write transforms.
  */
@@ -90,154 +81,6 @@ public class PubsubIOTest {
         .toString());
   }
 
-  /**
-   * Helper function that creates a {@link PubsubMessage} with the given timestamp registered as
-   * an attribute with the specified label.
-   *
-   * <p>If {@code label} is {@code null}, then the attributes are {@code null}.
-   *
-   * <p>Else, if {@code timestamp} is {@code null}, then attributes are present but have no key for
-   * the label.
-   */
-  private static PubsubMessage messageWithTimestamp(
-      @Nullable String label, @Nullable String timestamp) {
-    PubsubMessage message = new PubsubMessage();
-    if (label == null) {
-      message.setAttributes(null);
-      return message;
-    }
-
-    message.setAttributes(new HashMap<String, String>());
-
-    if (timestamp == null) {
-      return message;
-    }
-
-    message.getAttributes().put(label, timestamp);
-    return message;
-  }
-
-  /**
-   * Helper function that parses the given string to a timestamp through the PubSubIO plumbing.
-   */
-  private static Instant parseTimestamp(@Nullable String timestamp) {
-    PubsubMessage message = messageWithTimestamp("mylabel", timestamp);
-    return PubsubIO.assignMessageTimestamp(message, "mylabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void noTimestampLabelReturnsNow() {
-    final long time = 987654321L;
-    Instant timestamp = PubsubIO.assignMessageTimestamp(
-        messageWithTimestamp(null, null), null, new FixedClock(time));
-
-    assertEquals(new Instant(time), timestamp);
-  }
-
-  @Test
-  public void timestampLabelWithNullAttributesThrowsError() {
-    PubsubMessage message = messageWithTimestamp(null, null);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
-    PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void timestampLabelSetWithMissingAttributeThrowsError() {
-    PubsubMessage message = messageWithTimestamp("notMyLabel", "ignored");
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
-    PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
-  }
-
-  @Test
-  public void timestampLabelParsesMillisecondsSinceEpoch() {
-    Long millis = 1446162101123L;
-    assertEquals(new Instant(millis), parseTimestamp(millis.toString()));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Seconds() {
-    String rfc3339 = "2015-10-29T23:41:41Z";
-    assertEquals(Instant.parse(rfc3339), parseTimestamp(rfc3339));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Tenths() {
-    String rfc3339tenths = "2015-10-29T23:41:41.1Z";
-    assertEquals(Instant.parse(rfc3339tenths), parseTimestamp(rfc3339tenths));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Hundredths() {
-    String rfc3339hundredths = "2015-10-29T23:41:41.12Z";
-    assertEquals(Instant.parse(rfc3339hundredths), parseTimestamp(rfc3339hundredths));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Millis() {
-    String rfc3339millis = "2015-10-29T23:41:41.123Z";
-    assertEquals(Instant.parse(rfc3339millis), parseTimestamp(rfc3339millis));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Micros() {
-    String rfc3339micros = "2015-10-29T23:41:41.123456Z";
-    assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
-    // Note: micros part 456/1000 is dropped.
-    assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339MicrosRounding() {
-    String rfc3339micros = "2015-10-29T23:41:41.123999Z";
-    assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
-    // Note: micros part 999/1000 is dropped, not rounded up.
-    assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormatThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("not-a-timestamp");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat2ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("null");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat3ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parseTimestamp("2015-10");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithSmallYear() {
-    // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
-    // This is therefore a "small year" until this difference is reconciled.
-    String rfc3339SmallYear = "1582-10-15T01:23:45.123Z";
-    assertEquals(Instant.parse(rfc3339SmallYear), parseTimestamp(rfc3339SmallYear));
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithLargeYear() {
-    // Year 9999 in range.
-    String rfc3339LargeYear = "9999-10-29T23:41:41.123999Z";
-    assertEquals(Instant.parse(rfc3339LargeYear), parseTimestamp(rfc3339LargeYear));
-  }
-
-  @Test
-  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    // Year 10000 out of range.
-    parseTimestamp("10000-10-29T23:41:41.123999Z");
-  }
-
   @Test
   public void testReadDisplayData() {
     String topic = "projects/project/topics/topic";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
new file mode 100644
index 0000000..40c31fb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubApiaryClient.
+ */
+public class PubsubApiaryClientTest {
+  private Pubsub mockPubsub;
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String CUSTOM_ID =
+      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockPubsub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .setMessageId(MESSAGE_ID)
+        .encodeData(DATA.getBytes())
+        .setPublishTime(String.valueOf(PUB_TIME))
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, CUSTOM_ID));
+    ReceivedMessage expectedReceivedMessage =
+        new ReceivedMessage().setMessage(expectedPubsubMessage)
+                             .setAckId(ACK_ID);
+    PullResponse expectedResponse =
+        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+    Mockito.when(mockPubsub.projects()
+                           .subscriptions()
+                           .pull(expectedSubscription, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, CUSTOM_ID));
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when(mockPubsub.projects()
+                           .topics()
+                           .publish(expectedTopic, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
new file mode 100644
index 0000000..2250857
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Map;
+
+/**
+ * Tests for helper classes and methods in PubsubClient.
+ */
+public class PubsubClientTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  //
+  // Timestamp handling
+  //
+
+  private long parse(String timestamp) {
+    Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
+    return PubsubClient.extractTimestamp("myLabel", null, map);
+  }
+
+  private void roundTripRfc339(String timestamp) {
+    assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp));
+  }
+
+  private void truncatedRfc339(String timestamp, String truncatedTimestmap) {
+    assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp));
+  }
+
+  @Test
+  public void noTimestampLabelReturnsPubsubPublish() {
+    final long time = 987654321L;
+    long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
+    assertEquals(time, timestamp);
+  }
+
+  @Test
+  public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    PubsubClient.extractTimestamp(null, "not-a-date", null);
+  }
+
+  @Test
+  public void timestampLabelWithNullAttributesThrowsError() {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+    PubsubClient.extractTimestamp("myLabel", null, null);
+  }
+
+  @Test
+  public void timestampLabelSetWithMissingAttributeThrowsError() {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+    Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
+    PubsubClient.extractTimestamp("myLabel", null, map);
+  }
+
+  @Test
+  public void timestampLabelParsesMillisecondsSinceEpoch() {
+    long time = 1446162101123L;
+    Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
+    long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+    assertEquals(time, timestamp);
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Seconds() {
+    roundTripRfc339("2015-10-29T23:41:41Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Tenths() {
+    roundTripRfc339("2015-10-29T23:41:41.1Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Hundredths() {
+    roundTripRfc339("2015-10-29T23:41:41.12Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Millis() {
+    roundTripRfc339("2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339Micros() {
+    // Note: micros part 456/1000 is dropped.
+    truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339MicrosRounding() {
+    // Note: micros part 999/1000 is dropped, not rounded up.
+    truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormatThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("not-a-timestamp");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormat2ThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("null");
+  }
+
+  @Test
+  public void timestampLabelWithInvalidFormat3ThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    parse("2015-10");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339WithSmallYear() {
+    // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
+    // This is therefore a "small year" until this difference is reconciled.
+    roundTripRfc339("1582-10-15T01:23:45.123Z");
+  }
+
+  @Test
+  public void timestampLabelParsesRfc3339WithLargeYear() {
+    // Year 9999 in range.
+    roundTripRfc339("9999-10-29T23:41:41.123999Z");
+  }
+
+  @Test
+  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+    thrown.expect(NumberFormatException.class);
+    // Year 10000 out of range.
+    parse("10000-10-29T23:41:41.123999Z");
+  }
+
+  //
+  // Paths
+  //
+
+  @Test
+  public void projectPathFromIdWellFormed() {
+    ProjectPath path = PubsubClient.projectPathFromId("test");
+    assertEquals("projects/test", path.getPath());
+  }
+
+  @Test
+  public void subscriptionPathFromNameWellFormed() {
+    SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
+    assertEquals("projects/test/subscriptions/something", path.getPath());
+    assertEquals("/subscriptions/test/something", path.getV1Beta1Path());
+  }
+
+  @Test
+  public void topicPathFromNameWellFormed() {
+    TopicPath path = PubsubClient.topicPathFromName("test", "something");
+    assertEquals("projects/test/topics/something", path.getPath());
+    assertEquals("/topics/test/something", path.getV1Beta1Path());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
new file mode 100644
index 0000000..189049c
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+
+import io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubGrpcClient.
+ */
+public class PubsubGrpcClientTest {
+  private ManagedChannel mockChannel;
+  private GoogleCredentials mockCredentials;
+  private PublisherGrpc.PublisherBlockingStub mockPublisherStub;
+  private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub;
+
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String CUSTOM_ID =
+      Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockChannel = Mockito.mock(ManagedChannel.class);
+    mockCredentials = Mockito.mock(GoogleCredentials.class);
+    mockPublisherStub =
+        Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+    mockSubscriberStub =
+        Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel,
+                                  mockCredentials, mockPublisherStub, mockSubscriberStub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockChannel = null;
+    mockCredentials = null;
+    mockPublisherStub = null;
+    mockSubscriberStub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        PullRequest.newBuilder()
+                   .setSubscription(expectedSubscription)
+                   .setReturnImmediately(true)
+                   .setMaxMessages(10)
+                   .build();
+    Timestamp timestamp = Timestamp.newBuilder()
+                                   .setSeconds(PUB_TIME / 1000)
+                                   .setNanos((int) (PUB_TIME % 1000) * 1000)
+                                   .build();
+    PubsubMessage expectedPubsubMessage =
+        PubsubMessage.newBuilder()
+                     .setMessageId(MESSAGE_ID)
+                     .setData(
+                         ByteString.copyFrom(DATA.getBytes()))
+                     .setPublishTime(timestamp)
+                     .putAllAttributes(
+                         ImmutableMap.of(TIMESTAMP_LABEL,
+                                         String.valueOf(MESSAGE_TIME),
+                                         ID_LABEL, CUSTOM_ID))
+                     .build();
+    ReceivedMessage expectedReceivedMessage =
+        ReceivedMessage.newBuilder()
+                       .setMessage(expectedPubsubMessage)
+                       .setAckId(ACK_ID)
+                       .build();
+    PullResponse expectedResponse =
+        PullResponse.newBuilder()
+                    .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
+                    .build();
+    Mockito.when(mockSubscriberStub.pull(expectedRequest))
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage =
+        PubsubMessage.newBuilder()
+                     .setData(ByteString.copyFrom(DATA.getBytes()))
+                     .putAllAttributes(
+                         ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                                         ID_LABEL, CUSTOM_ID))
+                     .build();
+    PublishRequest expectedRequest =
+        PublishRequest.newBuilder()
+                      .setTopic(expectedTopic)
+                      .addAllMessages(
+                          ImmutableList.of(expectedPubsubMessage))
+                      .build();
+    PublishResponse expectedResponse =
+        PublishResponse.newBuilder()
+                       .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
+                       .build();
+    Mockito.when(mockPublisherStub.publish(expectedRequest))
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
new file mode 100644
index 0000000..7d8513b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubTestClient.
+ */
+public class PubsubTestClientTest {
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String ACK_ID = "testAckId";
+  private static final int ACK_TIMEOUT_S = 60;
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    IncomingMessage expectedIncomingMessage =
+        new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
+    try (PubsubTestClient client =
+             new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null,
+                                  Lists.newArrayList(expectedIncomingMessage))) {
+      long now = REQ_TIME;
+      client.advanceTo(now);
+      List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+      // Timeout on ACK.
+      now += (ACK_TIMEOUT_S + 10) * 1000;
+      client.advanceTo(now);
+      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+      now += 10 * 1000;
+      client.advanceTo(now);
+      // Extend ack
+      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+      // Timeout on extended ACK
+      now += 30 * 1000;
+      client.advanceTo(now);
+      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+      assertEquals(1, incomingMessages.size());
+      assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+      // Extend ack
+      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+      // Ack
+      now += 15 * 1000;
+      client.advanceTo(now);
+      client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+    }
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+    try (PubsubTestClient client =
+             new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S,
+                                  Sets.newHashSet(expectedOutgoingMessage), null)) {
+      client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+    }
+  }
+}