You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:59 UTC

[28/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
new file mode 100644
index 0000000..d290994
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for PubsubJsonClient.
+ */
+@RunWith(JUnit4.class)
+public class PubsubJsonClientTest {
+  private Pubsub mockPubsub;
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String RECORD_ID = "testRecordId";
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockPubsub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .setMessageId(MESSAGE_ID)
+        .encodeData(DATA.getBytes())
+        .setPublishTime(String.valueOf(PUB_TIME))
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, RECORD_ID));
+    ReceivedMessage expectedReceivedMessage =
+        new ReceivedMessage().setMessage(expectedPubsubMessage)
+                             .setAckId(ACK_ID);
+    PullResponse expectedResponse =
+        new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+    Mockito.when((Object) (mockPubsub.projects()
+                               .subscriptions()
+                               .pull(expectedSubscription, expectedRequest)
+                               .execute()))
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(RECORD_ID, actualMessage.recordId);
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.<String, String> builder()
+                    .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
+                    .put(ID_LABEL, RECORD_ID)
+                    .put("k", "v").build());
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
+           .thenReturn(expectedResponse);
+    Map<String, String> attrs = new HashMap<>();
+    attrs.put("k", "v");
+    OutgoingMessage actualMessage = new OutgoingMessage(
+            DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
new file mode 100644
index 0000000..18180af
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for PubsubTestClient.
+ */
+@RunWith(JUnit4.class)
+public class PubsubTestClientTest {
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String ACK_ID = "testAckId";
+  private static final int ACK_TIMEOUT_S = 60;
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    final AtomicLong now = new AtomicLong();
+    Clock clock = new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return now.get();
+      }
+    };
+    IncomingMessage expectedIncomingMessage =
+        new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
+                                                   Lists.newArrayList(expectedIncomingMessage))) {
+      try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+        now.set(REQ_TIME);
+        client.advance();
+        List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+        // Timeout on ACK.
+        now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
+        client.advance();
+        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+        now.addAndGet(10 * 1000);
+        client.advance();
+        // Extend ack
+        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+        // Timeout on extended ACK
+        now.addAndGet(30 * 1000);
+        client.advance();
+        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+        // Extend ack
+        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+        // Ack
+        now.addAndGet(15 * 1000);
+        client.advance();
+        client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+      }
+    }
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    OutgoingMessage expectedOutgoingMessage =
+        new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID);
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(
+                 TOPIC,
+                 Sets.newHashSet(expectedOutgoingMessage),
+                 ImmutableList.<OutgoingMessage>of())) {
+      try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+        client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
new file mode 100644
index 0000000..be425d4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test PubsubUnboundedSink.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSinkTest implements Serializable {
+  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final String DATA = "testData";
+  private static final Map<String, String> ATTRIBUTES =
+          ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
+  private static final long TIMESTAMP = 1234L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final int NUM_SHARDS = 10;
+
+  private static class Stamp extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
+    }
+  }
+
+  private String getRecordId(String data) {
+    return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void saneCoder() throws Exception {
+    OutgoingMessage message = new OutgoingMessage(
+            DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA));
+    CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
+    CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void sendOneMessage() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(new OutgoingMessage(
+                DATA.getBytes(),
+                ATTRIBUTES,
+                TIMESTAMP, getRecordId(DATA)));
+    int batchSize = 1;
+    int batchBytes = 1;
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              Duration.standardSeconds(2),
+              new SimpleFunction<String, PubsubIO.PubsubMessage>() {
+                @Override
+                public PubsubIO.PubsubMessage apply(String input) {
+                  return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES);
+                }
+              },
+              RecordIdMethod.DETERMINISTIC);
+      p.apply(Create.of(ImmutableList.of(DATA)))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void sendMoreThanOneBatchByNumMessages() throws IOException {
+    List<OutgoingMessage> outgoing = new ArrayList<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 2;
+    int batchBytes = 1000;
+    for (int i = 0; i < batchSize * 10; i++) {
+      String str = String.valueOf(i);
+      outgoing.add(new OutgoingMessage(
+              str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
+      data.add(str);
+    }
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
+      p.apply(Create.of(data))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void sendMoreThanOneBatchByByteSize() throws IOException {
+    List<OutgoingMessage> outgoing = new ArrayList<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 100;
+    int batchBytes = 10;
+    int n = 0;
+    while (n < batchBytes * 10) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < batchBytes; i++) {
+        sb.append(String.valueOf(n));
+      }
+      String str = sb.toString();
+      outgoing.add(new OutgoingMessage(
+              str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
+      data.add(str);
+      n += str.length();
+    }
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+                                                      ImmutableList.<OutgoingMessage>of())) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
+              StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+              NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+              null, RecordIdMethod.DETERMINISTIC);
+      p.apply(Create.of(data))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual published
+    // message does not match the expected publish message.
+  }
+
+  // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
+  // (and random) record ids to be reused. However that can't be done without the test runnner
+  // supporting retrying bundles.
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
new file mode 100644
index 0000000..d2e88c3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubCheckpoint;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubReader;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test PubsubUnboundedSource.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSourceTest {
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final String DATA = "testData";
+  private static final long TIMESTAMP = 1234L;
+  private static final long REQ_TIME = 6373L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String ACK_ID = "testAckId";
+  private static final String RECORD_ID = "testRecordId";
+  private static final int ACK_TIMEOUT_S = 60;
+
+  private AtomicLong now;
+  private Clock clock;
+  private PubsubTestClientFactory factory;
+  private PubsubSource<String> primSource;
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  private void setupOneMessage(Iterable<IncomingMessage> incoming) {
+    now = new AtomicLong(REQ_TIME);
+    clock = new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return now.get();
+      }
+    };
+    factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(
+            clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
+            StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+    primSource = new PubsubSource<>(source);
+  }
+
+  private void setupOneMessage() {
+    setupOneMessage(ImmutableList.of(
+        new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
+  }
+
+  @After
+  public void after() throws IOException {
+    factory.close();
+    now = null;
+    clock = null;
+    primSource = null;
+    factory = null;
+  }
+
+  @Test
+  public void checkpointCoderIsSane() throws Exception {
+    setupOneMessage(ImmutableList.<IncomingMessage>of());
+    CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
+    // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
+    // equals on checkpoints ignore those fields, we'll test serialization and deserialization
+    // of checkpoints in multipleReaders below.
+  }
+
+  @Test
+  public void readOneMessage() throws IOException {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    // Read one message.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    assertFalse(reader.advance());
+    // ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void timeoutAckAndRereadOneMessage() throws IOException {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Let the ACK deadline for the above expire.
+    now.addAndGet(65 * 1000);
+    pubsubClient.advance();
+    // We'll now receive the same message again.
+    assertTrue(reader.advance());
+    assertEquals(DATA, reader.getCurrent());
+    assertFalse(reader.advance());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void extendAck() throws IOException {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+    // Pull the first message but don't take a checkpoint for it.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Extend the ack
+    now.addAndGet(55 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Extend the ack again
+    now.addAndGet(25 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void timeoutAckExtensions() throws IOException {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+    // Pull the first message but don't take a checkpoint for it.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Extend the ack.
+    now.addAndGet(55 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Let the ack expire.
+    for (int i = 0; i < 3; i++) {
+      now.addAndGet(25 * 1000);
+      pubsubClient.advance();
+      assertFalse(reader.advance());
+    }
+    // Wait for resend.
+    now.addAndGet(25 * 1000);
+    pubsubClient.advance();
+    // Reread the same message.
+    assertTrue(reader.advance());
+    assertEquals(DATA, reader.getCurrent());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void multipleReaders() throws IOException {
+    List<IncomingMessage> incoming = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      String data = String.format("data_%d", i);
+      String ackid = String.format("ackid_%d", i);
+      incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
+    }
+    setupOneMessage(incoming);
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    // Consume two messages, only read one.
+    assertTrue(reader.start());
+    assertEquals("data_0", reader.getCurrent());
+
+    // Grab checkpoint.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    assertEquals(1, checkpoint.notYetReadIds.size());
+    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+    // Read second message.
+    assertTrue(reader.advance());
+    assertEquals("data_1", reader.getCurrent());
+
+    // Restore from checkpoint.
+    byte[] checkpointBytes =
+        CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
+    checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
+                                                checkpointBytes);
+    assertEquals(1, checkpoint.notYetReadIds.size());
+    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+    // Re-read second message.
+    reader = primSource.createReader(p.getOptions(), checkpoint);
+    assertTrue(reader.start());
+    assertEquals("data_1", reader.getCurrent());
+
+    // We are done.
+    assertFalse(reader.advance());
+
+    // ACK final message.
+    checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  private long messageNumToTimestamp(int messageNum) {
+    return TIMESTAMP + messageNum * 100;
+  }
+
+  @Test
+  public void readManyMessages() throws IOException {
+    Map<String, Integer> dataToMessageNum = new HashMap<>();
+
+    final int m = 97;
+    final int n = 10000;
+    List<IncomingMessage> incoming = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      // Make the messages timestamps slightly out of order.
+      int messageNum = ((i / m) * m) + (m - 1) - (i % m);
+      String data = String.format("data_%d", messageNum);
+      dataToMessageNum.put(data, messageNum);
+      String recid = String.format("recordid_%d", messageNum);
+      String ackId = String.format("ackid_%d", messageNum);
+      incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0,
+                                       ackId, recid));
+    }
+    setupOneMessage(incoming);
+
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+
+    for (int i = 0; i < n; i++) {
+      if (i == 0) {
+        assertTrue(reader.start());
+      } else {
+        assertTrue(reader.advance());
+      }
+      // We'll checkpoint and ack within the 2min limit.
+      now.addAndGet(30);
+      pubsubClient.advance();
+      String data = reader.getCurrent();
+      Integer messageNum = dataToMessageNum.remove(data);
+      // No duplicate messages.
+      assertNotNull(messageNum);
+      // Preserve timestamp.
+      assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
+      // Preserve record id.
+      String recid = String.format("recordid_%d", messageNum);
+      assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
+
+      if (i % 1000 == 999) {
+        // Estimated watermark can never get ahead of actual outstanding messages.
+        long watermark = reader.getWatermark().getMillis();
+        long minOutstandingTimestamp = Long.MAX_VALUE;
+        for (Integer outstandingMessageNum : dataToMessageNum.values()) {
+          minOutstandingTimestamp =
+              Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
+        }
+        assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
+        // Ack messages, but only every other finalization.
+        PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+        if (i % 2000 == 1999) {
+          checkpoint.finalizeCheckpoint();
+        }
+      }
+    }
+    // We are done.
+    assertFalse(reader.advance());
+    // We saw each message exactly once.
+    assertTrue(dataToMessageNum.isEmpty());
+    reader.close();
+  }
+
+  @Test
+  public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+    factory = PubsubTestClient.createFactoryForCreateSubscription();
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(
+            factory,
+            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+            StaticValueProvider.of(topicPath),
+            null,
+            StringUtf8Coder.of(),
+            null,
+            null,
+            null);
+    assertThat(source.getSubscription(), nullValue());
+
+    assertThat(source.getSubscription(), nullValue());
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<PubsubSource<String>> splits =
+        (new PubsubSource<>(source)).generateInitialSplits(3, options);
+    // We have at least one returned split
+    assertThat(splits, hasSize(greaterThan(0)));
+    for (PubsubSource<String> split : splits) {
+      // Each split is equal
+      assertThat(split, equalTo(splits.get(0)));
+    }
+
+    assertThat(splits.get(0).subscriptionPath, not(nullValue()));
+  }
+
+  @Test
+  public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
+    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+    factory = PubsubTestClient.createFactoryForCreateSubscription();
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(
+            factory,
+            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+            StaticValueProvider.of(topicPath),
+            null,
+            StringUtf8Coder.of(),
+            null,
+            null,
+            null);
+    assertThat(source.getSubscription(), nullValue());
+
+    assertThat(source.getSubscription(), nullValue());
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    PubsubSource<String> actualSource = new PubsubSource<>(source);
+    PubsubReader<String> reader = actualSource.createReader(options, null);
+    SubscriptionPath createdSubscription = reader.subscription;
+    assertThat(createdSubscription, not(nullValue()));
+
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
+
+    checkpoint.finalizeCheckpoint();
+    PubsubCheckpoint<String> deserCheckpoint =
+        CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
+    assertThat(checkpoint.subscriptionPath, not(nullValue()));
+    assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
+
+    PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
+    PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+
+    assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
+    assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
+  }
+
+  /**
+   * Tests that checkpoints finalized after the reader is closed succeed.
+   */
+  @Test
+  public void closeWithActiveCheckpoints() throws Exception {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    reader.start();
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    reader.close();
+    checkpoint.finalizeCheckpoint();
+  }
+}