You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:59 UTC
[28/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
new file mode 100644
index 0000000..d290994
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for PubsubJsonClient.
+ */
+@RunWith(JUnit4.class)
+public class PubsubJsonClientTest {
+ private Pubsub mockPubsub;
+ private PubsubClient client;
+
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final long REQ_TIME = 1234L;
+ private static final long PUB_TIME = 3456L;
+ private static final long MESSAGE_TIME = 6789L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final String MESSAGE_ID = "testMessageId";
+ private static final String DATA = "testData";
+ private static final String RECORD_ID = "testRecordId";
+ private static final String ACK_ID = "testAckId";
+
+ @Before
+ public void setup() throws IOException {
+ mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+ client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ client.close();
+ client = null;
+ mockPubsub = null;
+ }
+
+ @Test
+ public void pullOneMessage() throws IOException {
+ String expectedSubscription = SUBSCRIPTION.getPath();
+ PullRequest expectedRequest =
+ new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .setMessageId(MESSAGE_ID)
+ .encodeData(DATA.getBytes())
+ .setPublishTime(String.valueOf(PUB_TIME))
+ .setAttributes(
+ ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+ ID_LABEL, RECORD_ID));
+ ReceivedMessage expectedReceivedMessage =
+ new ReceivedMessage().setMessage(expectedPubsubMessage)
+ .setAckId(ACK_ID);
+ PullResponse expectedResponse =
+ new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+ Mockito.when((Object) (mockPubsub.projects()
+ .subscriptions()
+ .pull(expectedSubscription, expectedRequest)
+ .execute()))
+ .thenReturn(expectedResponse);
+ List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+ assertEquals(1, acutalMessages.size());
+ IncomingMessage actualMessage = acutalMessages.get(0);
+ assertEquals(ACK_ID, actualMessage.ackId);
+ assertEquals(DATA, new String(actualMessage.elementBytes));
+ assertEquals(RECORD_ID, actualMessage.recordId);
+ assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+ assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+ }
+
+ @Test
+ public void publishOneMessage() throws IOException {
+ String expectedTopic = TOPIC.getPath();
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .encodeData(DATA.getBytes())
+ .setAttributes(
+ ImmutableMap.<String, String> builder()
+ .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
+ .put(ID_LABEL, RECORD_ID)
+ .put("k", "v").build());
+ PublishRequest expectedRequest = new PublishRequest()
+ .setMessages(ImmutableList.of(expectedPubsubMessage));
+ PublishResponse expectedResponse = new PublishResponse()
+ .setMessageIds(ImmutableList.of(MESSAGE_ID));
+ Mockito.when((Object) (mockPubsub.projects()
+ .topics()
+ .publish(expectedTopic, expectedRequest)
+ .execute()))
+ .thenReturn(expectedResponse);
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("k", "v");
+ OutgoingMessage actualMessage = new OutgoingMessage(
+ DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
+ int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+ assertEquals(1, n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
new file mode 100644
index 0000000..18180af
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for PubsubTestClient.
+ */
+@RunWith(JUnit4.class)
+public class PubsubTestClientTest {
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final long REQ_TIME = 1234L;
+ private static final long MESSAGE_TIME = 6789L;
+ private static final String MESSAGE_ID = "testMessageId";
+ private static final String DATA = "testData";
+ private static final String ACK_ID = "testAckId";
+ private static final int ACK_TIMEOUT_S = 60;
+
+ @Test
+ public void pullOneMessage() throws IOException {
+ final AtomicLong now = new AtomicLong();
+ Clock clock = new Clock() {
+ @Override
+ public long currentTimeMillis() {
+ return now.get();
+ }
+ };
+ IncomingMessage expectedIncomingMessage =
+ new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
+ Lists.newArrayList(expectedIncomingMessage))) {
+ try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+ now.set(REQ_TIME);
+ client.advance();
+ List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+ // Timeout on ACK.
+ now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
+ client.advance();
+ incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+ now.addAndGet(10 * 1000);
+ client.advance();
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Timeout on extended ACK
+ now.addAndGet(30 * 1000);
+ client.advance();
+ incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Ack
+ now.addAndGet(15 * 1000);
+ client.advance();
+ client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+ }
+ }
+ }
+
+ @Test
+ public void publishOneMessage() throws IOException {
+ OutgoingMessage expectedOutgoingMessage =
+ new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID);
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(
+ TOPIC,
+ Sets.newHashSet(expectedOutgoingMessage),
+ ImmutableList.<OutgoingMessage>of())) {
+ try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+ client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
new file mode 100644
index 0000000..be425d4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test PubsubUnboundedSink.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSinkTest implements Serializable {
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final String DATA = "testData";
+ private static final Map<String, String> ATTRIBUTES =
+ ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
+ private static final long TIMESTAMP = 1234L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final int NUM_SHARDS = 10;
+
+ private static class Stamp extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
+ }
+ }
+
+ private String getRecordId(String data) {
+ return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
+ }
+
+ @Rule
+ public transient TestPipeline p = TestPipeline.create();
+
+ @Test
+ public void saneCoder() throws Exception {
+ OutgoingMessage message = new OutgoingMessage(
+ DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA));
+ CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
+ CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void sendOneMessage() throws IOException {
+ List<OutgoingMessage> outgoing =
+ ImmutableList.of(new OutgoingMessage(
+ DATA.getBytes(),
+ ATTRIBUTES,
+ TIMESTAMP, getRecordId(DATA)));
+ int batchSize = 1;
+ int batchBytes = 1;
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+ TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ Duration.standardSeconds(2),
+ new SimpleFunction<String, PubsubIO.PubsubMessage>() {
+ @Override
+ public PubsubIO.PubsubMessage apply(String input) {
+ return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES);
+ }
+ },
+ RecordIdMethod.DETERMINISTIC);
+ p.apply(Create.of(ImmutableList.of(DATA)))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
+ // message does not match the expected publish message.
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void sendMoreThanOneBatchByNumMessages() throws IOException {
+ List<OutgoingMessage> outgoing = new ArrayList<>();
+ List<String> data = new ArrayList<>();
+ int batchSize = 2;
+ int batchBytes = 1000;
+ for (int i = 0; i < batchSize * 10; i++) {
+ String str = String.valueOf(i);
+ outgoing.add(new OutgoingMessage(
+ str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
+ data.add(str);
+ }
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+ TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
+ // message does not match the expected publish message.
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void sendMoreThanOneBatchByByteSize() throws IOException {
+ List<OutgoingMessage> outgoing = new ArrayList<>();
+ List<String> data = new ArrayList<>();
+ int batchSize = 100;
+ int batchBytes = 10;
+ int n = 0;
+ while (n < batchBytes * 10) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < batchBytes; i++) {
+ sb.append(String.valueOf(n));
+ }
+ String str = sb.toString();
+ outgoing.add(new OutgoingMessage(
+ str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
+ data.add(str);
+ n += str.length();
+ }
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
+ StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+ null, RecordIdMethod.DETERMINISTIC);
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
+ // message does not match the expected publish message.
+ }
+
+ // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
+ // (and random) record ids to be reused. However that can't be done without the test runnner
+ // supporting retrying bundles.
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
new file mode 100644
index 0000000..d2e88c3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubCheckpoint;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubReader;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test PubsubUnboundedSource.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSourceTest {
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final String DATA = "testData";
+ private static final long TIMESTAMP = 1234L;
+ private static final long REQ_TIME = 6373L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final String ACK_ID = "testAckId";
+ private static final String RECORD_ID = "testRecordId";
+ private static final int ACK_TIMEOUT_S = 60;
+
+ private AtomicLong now;
+ private Clock clock;
+ private PubsubTestClientFactory factory;
+ private PubsubSource<String> primSource;
+
+ @Rule
+ public TestPipeline p = TestPipeline.create();
+
+ private void setupOneMessage(Iterable<IncomingMessage> incoming) {
+ now = new AtomicLong(REQ_TIME);
+ clock = new Clock() {
+ @Override
+ public long currentTimeMillis() {
+ return now.get();
+ }
+ };
+ factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(
+ clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
+ StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+ primSource = new PubsubSource<>(source);
+ }
+
+ private void setupOneMessage() {
+ setupOneMessage(ImmutableList.of(
+ new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
+ }
+
+ @After
+ public void after() throws IOException {
+ factory.close();
+ now = null;
+ clock = null;
+ primSource = null;
+ factory = null;
+ }
+
+ @Test
+ public void checkpointCoderIsSane() throws Exception {
+ setupOneMessage(ImmutableList.<IncomingMessage>of());
+ CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
+ // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
+ // equals on checkpoints ignore those fields, we'll test serialization and deserialization
+ // of checkpoints in multipleReaders below.
+ }
+
+ @Test
+ public void readOneMessage() throws IOException {
+ setupOneMessage();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ // Read one message.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ assertFalse(reader.advance());
+ // ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void timeoutAckAndRereadOneMessage() throws IOException {
+ setupOneMessage();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Let the ACK deadline for the above expire.
+ now.addAndGet(65 * 1000);
+ pubsubClient.advance();
+ // We'll now receive the same message again.
+ assertTrue(reader.advance());
+ assertEquals(DATA, reader.getCurrent());
+ assertFalse(reader.advance());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void extendAck() throws IOException {
+ setupOneMessage();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Pull the first message but don't take a checkpoint for it.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Extend the ack
+ now.addAndGet(55 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Extend the ack again
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void timeoutAckExtensions() throws IOException {
+ setupOneMessage();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Pull the first message but don't take a checkpoint for it.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Extend the ack.
+ now.addAndGet(55 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Let the ack expire.
+ for (int i = 0; i < 3; i++) {
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ }
+ // Wait for resend.
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ // Reread the same message.
+ assertTrue(reader.advance());
+ assertEquals(DATA, reader.getCurrent());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void multipleReaders() throws IOException {
+ List<IncomingMessage> incoming = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ String data = String.format("data_%d", i);
+ String ackid = String.format("ackid_%d", i);
+ incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
+ }
+ setupOneMessage(incoming);
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ // Consume two messages, only read one.
+ assertTrue(reader.start());
+ assertEquals("data_0", reader.getCurrent());
+
+ // Grab checkpoint.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ assertEquals(1, checkpoint.notYetReadIds.size());
+ assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+ // Read second message.
+ assertTrue(reader.advance());
+ assertEquals("data_1", reader.getCurrent());
+
+ // Restore from checkpoint.
+ byte[] checkpointBytes =
+ CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
+ checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
+ checkpointBytes);
+ assertEquals(1, checkpoint.notYetReadIds.size());
+ assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+ // Re-read second message.
+ reader = primSource.createReader(p.getOptions(), checkpoint);
+ assertTrue(reader.start());
+ assertEquals("data_1", reader.getCurrent());
+
+ // We are done.
+ assertFalse(reader.advance());
+
+ // ACK final message.
+ checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ private long messageNumToTimestamp(int messageNum) {
+ return TIMESTAMP + messageNum * 100;
+ }
+
+ @Test
+ public void readManyMessages() throws IOException {
+ Map<String, Integer> dataToMessageNum = new HashMap<>();
+
+ final int m = 97;
+ final int n = 10000;
+ List<IncomingMessage> incoming = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ // Make the messages timestamps slightly out of order.
+ int messageNum = ((i / m) * m) + (m - 1) - (i % m);
+ String data = String.format("data_%d", messageNum);
+ dataToMessageNum.put(data, messageNum);
+ String recid = String.format("recordid_%d", messageNum);
+ String ackId = String.format("ackid_%d", messageNum);
+ incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0,
+ ackId, recid));
+ }
+ setupOneMessage(incoming);
+
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+
+ for (int i = 0; i < n; i++) {
+ if (i == 0) {
+ assertTrue(reader.start());
+ } else {
+ assertTrue(reader.advance());
+ }
+ // We'll checkpoint and ack within the 2min limit.
+ now.addAndGet(30);
+ pubsubClient.advance();
+ String data = reader.getCurrent();
+ Integer messageNum = dataToMessageNum.remove(data);
+ // No duplicate messages.
+ assertNotNull(messageNum);
+ // Preserve timestamp.
+ assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
+ // Preserve record id.
+ String recid = String.format("recordid_%d", messageNum);
+ assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
+
+ if (i % 1000 == 999) {
+ // Estimated watermark can never get ahead of actual outstanding messages.
+ long watermark = reader.getWatermark().getMillis();
+ long minOutstandingTimestamp = Long.MAX_VALUE;
+ for (Integer outstandingMessageNum : dataToMessageNum.values()) {
+ minOutstandingTimestamp =
+ Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
+ }
+ assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
+ // Ack messages, but only every other finalization.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ if (i % 2000 == 1999) {
+ checkpoint.finalizeCheckpoint();
+ }
+ }
+ }
+ // We are done.
+ assertFalse(reader.advance());
+ // We saw each message exactly once.
+ assertTrue(dataToMessageNum.isEmpty());
+ reader.close();
+ }
+
+ @Test
+ public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+ TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+ factory = PubsubTestClient.createFactoryForCreateSubscription();
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(
+ factory,
+ StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+ StaticValueProvider.of(topicPath),
+ null,
+ StringUtf8Coder.of(),
+ null,
+ null,
+ null);
+ assertThat(source.getSubscription(), nullValue());
+
+ assertThat(source.getSubscription(), nullValue());
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<PubsubSource<String>> splits =
+ (new PubsubSource<>(source)).generateInitialSplits(3, options);
+ // We have at least one returned split
+ assertThat(splits, hasSize(greaterThan(0)));
+ for (PubsubSource<String> split : splits) {
+ // Each split is equal
+ assertThat(split, equalTo(splits.get(0)));
+ }
+
+ assertThat(splits.get(0).subscriptionPath, not(nullValue()));
+ }
+
+ @Test
+ public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
+ TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+ factory = PubsubTestClient.createFactoryForCreateSubscription();
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(
+ factory,
+ StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+ StaticValueProvider.of(topicPath),
+ null,
+ StringUtf8Coder.of(),
+ null,
+ null,
+ null);
+ assertThat(source.getSubscription(), nullValue());
+
+ assertThat(source.getSubscription(), nullValue());
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ PubsubSource<String> actualSource = new PubsubSource<>(source);
+ PubsubReader<String> reader = actualSource.createReader(options, null);
+ SubscriptionPath createdSubscription = reader.subscription;
+ assertThat(createdSubscription, not(nullValue()));
+
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
+
+ checkpoint.finalizeCheckpoint();
+ PubsubCheckpoint<String> deserCheckpoint =
+ CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
+ assertThat(checkpoint.subscriptionPath, not(nullValue()));
+ assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
+
+ PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
+ PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+
+ assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
+ assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
+ }
+
+ /**
+ * Tests that checkpoints finalized after the reader is closed succeed.
+ */
+ @Test
+ public void closeWithActiveCheckpoints() throws Exception {
+ setupOneMessage();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ reader.start();
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ reader.close();
+ checkpoint.finalizeCheckpoint();
+ }
+}