You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 15:23:05 UTC
[2/3] incubator-beam git commit: [BEAM-53] Add PubsubApiaryClient,
PubsubTestClient
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
new file mode 100644
index 0000000..4a47c30
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods.
+ */
+public class PubsubTestClient extends PubsubClient {
+ public static PubsubClientFactory createFactoryForPublish(
+ final TopicPath expectedTopic,
+ final Set<OutgoingMessage> expectedOutgoingMessages) {
+ return new PubsubClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
+ }
+ };
+ }
+
+ public static PubsubClientFactory createFactoryForPull(
+ @Nullable final SubscriptionPath expectedSubscription,
+ final int ackTimeoutSec,
+ @Nullable final List<IncomingMessage> expectedIncomingMessages) {
+ return new PubsubClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
+ null, expectedIncomingMessages);
+ }
+ };
+ }
+
+ /**
+ * Only publish calls for this topic are allowed.
+ */
+ @Nullable
+ private TopicPath expectedTopic;
+ /**
+ * Only pull calls for this subscription are allowed.
+ */
+ @Nullable
+ private SubscriptionPath expectedSubscription;
+
+ /**
+ * Timeout to simulate.
+ */
+ private int ackTimeoutSec;
+
+ /**
+ * Messages yet to seen in a {@link #publish} call.
+ */
+ @Nullable
+ private Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+ /**
+ * Messages waiting to be received by a {@link #pull} call.
+ */
+ @Nullable
+ private List<IncomingMessage> remainingPendingIncomingMessages;
+
+ /**
+ * Messages which have been returned from a {@link #pull} call and
+ * not yet ACKed by an {@link #acknowledge} call.
+ */
+ private Map<String, IncomingMessage> pendingAckIncommingMessages;
+
+ /**
+ * When above messages are due to have their ACK deadlines expire.
+ */
+ private Map<String, Long> ackDeadline;
+
+ /**
+ * Current time.
+ */
+ private long nowMsSinceEpoch;
+
+ @VisibleForTesting
+ PubsubTestClient(
+ @Nullable TopicPath expectedTopic,
+ @Nullable SubscriptionPath expectedSubscription,
+ int ackTimeoutSec,
+ @Nullable Set<OutgoingMessage> expectedOutgoingMessages,
+ @Nullable List<IncomingMessage> expectedIncomingMessages) {
+ this.expectedTopic = expectedTopic;
+ this.expectedSubscription = expectedSubscription;
+ this.ackTimeoutSec = ackTimeoutSec;
+
+ this.remainingExpectedOutgoingMessages = expectedOutgoingMessages;
+ this.remainingPendingIncomingMessages = expectedIncomingMessages;
+
+ this.pendingAckIncommingMessages = new HashMap<>();
+ this.ackDeadline = new HashMap<>();
+ this.nowMsSinceEpoch = Long.MIN_VALUE;
+ }
+
+ /**
+ * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring
+ * outstanding ACKs.
+ */
+ public void advanceTo(long newNowMsSinceEpoch) {
+ checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch,
+ "Cannot advance time backwards from %d to %d", nowMsSinceEpoch,
+ newNowMsSinceEpoch);
+ nowMsSinceEpoch = newNowMsSinceEpoch;
+ // Any messages who's ACKs timed out are available for re-pulling.
+ Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator();
+ while (deadlineItr.hasNext()) {
+ Map.Entry<String, Long> entry = deadlineItr.next();
+ if (entry.getValue() <= nowMsSinceEpoch) {
+ remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey()));
+ deadlineItr.remove();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (remainingExpectedOutgoingMessages != null) {
+ checkState(this.remainingExpectedOutgoingMessages.isEmpty(),
+ "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size());
+ remainingExpectedOutgoingMessages = null;
+ }
+ if (remainingPendingIncomingMessages != null) {
+ checkState(remainingPendingIncomingMessages.isEmpty(),
+ "Failed to publish %d messages", remainingPendingIncomingMessages.size());
+ checkState(pendingAckIncommingMessages.isEmpty(),
+ "Failed to ACK %d messages", pendingAckIncommingMessages.size());
+ checkState(ackDeadline.isEmpty(),
+ "Failed to ACK %d messages", ackDeadline.size());
+ remainingPendingIncomingMessages = null;
+ pendingAckIncommingMessages = null;
+ ackDeadline = null;
+ }
+ }
+
+ @Override
+ public int publish(
+ TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+ checkNotNull(expectedTopic, "Missing expected topic");
+ checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages");
+ checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic,
+ expectedTopic);
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage),
+ "Unexpeced outgoing message %s", outgoingMessage);
+ }
+ return outgoingMessages.size();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+ boolean returnImmediately) throws IOException {
+ checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch,
+ "Simulated time %d does not match requset time %d", nowMsSinceEpoch,
+ requestTimeMsSinceEpoch);
+ checkNotNull(expectedSubscription, "Missing expected subscription");
+ checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+ checkState(subscription.equals(expectedSubscription),
+ "Subscription %s does not match expected %s", subscription, expectedSubscription);
+ checkState(returnImmediately, "PubsubTestClient only supports returning immediately");
+
+ List<IncomingMessage> incomingMessages = new ArrayList<>();
+ Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator();
+ while (pendItr.hasNext()) {
+ IncomingMessage incomingMessage = pendItr.next();
+ pendItr.remove();
+ IncomingMessage incomingMessageWithRequestTime =
+ incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+ incomingMessages.add(incomingMessageWithRequestTime);
+ pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId,
+ incomingMessageWithRequestTime);
+ ackDeadline.put(incomingMessageWithRequestTime.ackId,
+ requestTimeMsSinceEpoch + ackTimeoutSec * 1000);
+ if (incomingMessages.size() >= batchSize) {
+ break;
+ }
+ }
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(
+ SubscriptionPath subscription,
+ List<String> ackIds) throws IOException {
+ checkNotNull(expectedSubscription, "Missing expected subscription");
+ checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+ checkState(subscription.equals(expectedSubscription),
+ "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+ for (String ackId : ackIds) {
+ checkState(ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is outstanding", ackId);
+ checkState(pendingAckIncommingMessages.remove(ackId) != null,
+ "No message with ACK id %s is outstanding", ackId);
+ }
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+ checkNotNull(expectedSubscription, "Missing expected subscription");
+ checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
+ checkState(subscription.equals(expectedSubscription),
+ "Subscription %s does not match expected %s", subscription, expectedSubscription);
+
+ for (String ackId : ackIds) {
+ checkState(ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is outstanding", ackId);
+ checkState(pendingAckIncommingMessages.containsKey(ackId),
+ "No message with ACK id %s is outstanding", ackId);
+ ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000);
+ }
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(
+ ProjectPath project, TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ return ackTimeoutSec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 9082ce3..6daecdb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -105,7 +105,11 @@ public class Transport {
/**
* Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+ *
+ * @deprecated Use an appropriate
+ * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory}
*/
+ @Deprecated
public static Pubsub.Builder
newPubsubClient(PubsubOptions options) {
return new Pubsub.Builder(getTransport(), getJsonFactory(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 1e5bf51..eaf452d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -24,22 +24,13 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
-import com.google.api.services.pubsub.model.PubsubMessage;
-
import org.joda.time.Duration;
-import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.HashMap;
-
-import javax.annotation.Nullable;
-
/**
* Tests for PubsubIO Read and Write transforms.
*/
@@ -90,154 +81,6 @@ public class PubsubIOTest {
.toString());
}
- /**
- * Helper function that creates a {@link PubsubMessage} with the given timestamp registered as
- * an attribute with the specified label.
- *
- * <p>If {@code label} is {@code null}, then the attributes are {@code null}.
- *
- * <p>Else, if {@code timestamp} is {@code null}, then attributes are present but have no key for
- * the label.
- */
- private static PubsubMessage messageWithTimestamp(
- @Nullable String label, @Nullable String timestamp) {
- PubsubMessage message = new PubsubMessage();
- if (label == null) {
- message.setAttributes(null);
- return message;
- }
-
- message.setAttributes(new HashMap<String, String>());
-
- if (timestamp == null) {
- return message;
- }
-
- message.getAttributes().put(label, timestamp);
- return message;
- }
-
- /**
- * Helper function that parses the given string to a timestamp through the PubSubIO plumbing.
- */
- private static Instant parseTimestamp(@Nullable String timestamp) {
- PubsubMessage message = messageWithTimestamp("mylabel", timestamp);
- return PubsubIO.assignMessageTimestamp(message, "mylabel", Clock.SYSTEM);
- }
-
- @Test
- public void noTimestampLabelReturnsNow() {
- final long time = 987654321L;
- Instant timestamp = PubsubIO.assignMessageTimestamp(
- messageWithTimestamp(null, null), null, new FixedClock(time));
-
- assertEquals(new Instant(time), timestamp);
- }
-
- @Test
- public void timestampLabelWithNullAttributesThrowsError() {
- PubsubMessage message = messageWithTimestamp(null, null);
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
- PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
- }
-
- @Test
- public void timestampLabelSetWithMissingAttributeThrowsError() {
- PubsubMessage message = messageWithTimestamp("notMyLabel", "ignored");
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel");
-
- PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM);
- }
-
- @Test
- public void timestampLabelParsesMillisecondsSinceEpoch() {
- Long millis = 1446162101123L;
- assertEquals(new Instant(millis), parseTimestamp(millis.toString()));
- }
-
- @Test
- public void timestampLabelParsesRfc3339Seconds() {
- String rfc3339 = "2015-10-29T23:41:41Z";
- assertEquals(Instant.parse(rfc3339), parseTimestamp(rfc3339));
- }
-
- @Test
- public void timestampLabelParsesRfc3339Tenths() {
- String rfc3339tenths = "2015-10-29T23:41:41.1Z";
- assertEquals(Instant.parse(rfc3339tenths), parseTimestamp(rfc3339tenths));
- }
-
- @Test
- public void timestampLabelParsesRfc3339Hundredths() {
- String rfc3339hundredths = "2015-10-29T23:41:41.12Z";
- assertEquals(Instant.parse(rfc3339hundredths), parseTimestamp(rfc3339hundredths));
- }
-
- @Test
- public void timestampLabelParsesRfc3339Millis() {
- String rfc3339millis = "2015-10-29T23:41:41.123Z";
- assertEquals(Instant.parse(rfc3339millis), parseTimestamp(rfc3339millis));
- }
-
- @Test
- public void timestampLabelParsesRfc3339Micros() {
- String rfc3339micros = "2015-10-29T23:41:41.123456Z";
- assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
- // Note: micros part 456/1000 is dropped.
- assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
- }
-
- @Test
- public void timestampLabelParsesRfc3339MicrosRounding() {
- String rfc3339micros = "2015-10-29T23:41:41.123999Z";
- assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros));
- // Note: micros part 999/1000 is dropped, not rounded up.
- assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros));
- }
-
- @Test
- public void timestampLabelWithInvalidFormatThrowsError() {
- thrown.expect(NumberFormatException.class);
- parseTimestamp("not-a-timestamp");
- }
-
- @Test
- public void timestampLabelWithInvalidFormat2ThrowsError() {
- thrown.expect(NumberFormatException.class);
- parseTimestamp("null");
- }
-
- @Test
- public void timestampLabelWithInvalidFormat3ThrowsError() {
- thrown.expect(NumberFormatException.class);
- parseTimestamp("2015-10");
- }
-
- @Test
- public void timestampLabelParsesRfc3339WithSmallYear() {
- // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
- // This is therefore a "small year" until this difference is reconciled.
- String rfc3339SmallYear = "1582-10-15T01:23:45.123Z";
- assertEquals(Instant.parse(rfc3339SmallYear), parseTimestamp(rfc3339SmallYear));
- }
-
- @Test
- public void timestampLabelParsesRfc3339WithLargeYear() {
- // Year 9999 in range.
- String rfc3339LargeYear = "9999-10-29T23:41:41.123999Z";
- assertEquals(Instant.parse(rfc3339LargeYear), parseTimestamp(rfc3339LargeYear));
- }
-
- @Test
- public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
- thrown.expect(NumberFormatException.class);
- // Year 10000 out of range.
- parseTimestamp("10000-10-29T23:41:41.123999Z");
- }
-
@Test
public void testReadDisplayData() {
String topic = "projects/project/topics/topic";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
new file mode 100644
index 0000000..40c31fb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubApiaryClient.
+ */
+public class PubsubApiaryClientTest {
+ private Pubsub mockPubsub;
+ private PubsubClient client;
+
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final long REQ_TIME = 1234L;
+ private static final long PUB_TIME = 3456L;
+ private static final long MESSAGE_TIME = 6789L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final String MESSAGE_ID = "testMessageId";
+ private static final String DATA = "testData";
+ private static final String CUSTOM_ID =
+ Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+ private static final String ACK_ID = "testAckId";
+
+ @Before
+ public void setup() throws IOException {
+ mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+ client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ client.close();
+ client = null;
+ mockPubsub = null;
+ }
+
+ @Test
+ public void pullOneMessage() throws IOException {
+ String expectedSubscription = SUBSCRIPTION.getPath();
+ PullRequest expectedRequest =
+ new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .setMessageId(MESSAGE_ID)
+ .encodeData(DATA.getBytes())
+ .setPublishTime(String.valueOf(PUB_TIME))
+ .setAttributes(
+ ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+ ID_LABEL, CUSTOM_ID));
+ ReceivedMessage expectedReceivedMessage =
+ new ReceivedMessage().setMessage(expectedPubsubMessage)
+ .setAckId(ACK_ID);
+ PullResponse expectedResponse =
+ new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+ Mockito.when(mockPubsub.projects()
+ .subscriptions()
+ .pull(expectedSubscription, expectedRequest)
+ .execute())
+ .thenReturn(expectedResponse);
+ List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+ assertEquals(1, acutalMessages.size());
+ IncomingMessage actualMessage = acutalMessages.get(0);
+ assertEquals(ACK_ID, actualMessage.ackId);
+ assertEquals(DATA, new String(actualMessage.elementBytes));
+ assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+ assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+ assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+ }
+
+ @Test
+ public void publishOneMessage() throws IOException {
+ String expectedTopic = TOPIC.getPath();
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .encodeData(DATA.getBytes())
+ .setAttributes(
+ ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+ ID_LABEL, CUSTOM_ID));
+ PublishRequest expectedRequest = new PublishRequest()
+ .setMessages(ImmutableList.of(expectedPubsubMessage));
+ PublishResponse expectedResponse = new PublishResponse()
+ .setMessageIds(ImmutableList.of(MESSAGE_ID));
+ Mockito.when(mockPubsub.projects()
+ .topics()
+ .publish(expectedTopic, expectedRequest)
+ .execute())
+ .thenReturn(expectedResponse);
+ OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+ int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+ assertEquals(1, n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
new file mode 100644
index 0000000..2250857
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Map;
+
+/**
+ * Tests for helper classes and methods in PubsubClient.
+ */
+public class PubsubClientTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ //
+ // Timestamp handling
+ //
+
+ private long parse(String timestamp) {
+ Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
+ return PubsubClient.extractTimestamp("myLabel", null, map);
+ }
+
+ private void roundTripRfc339(String timestamp) {
+ assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp));
+ }
+
+ private void truncatedRfc339(String timestamp, String truncatedTimestmap) {
+ assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp));
+ }
+
+ @Test
+ public void noTimestampLabelReturnsPubsubPublish() {
+ final long time = 987654321L;
+ long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
+ assertEquals(time, timestamp);
+ }
+
+ @Test
+ public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+ thrown.expect(NumberFormatException.class);
+ PubsubClient.extractTimestamp(null, "not-a-date", null);
+ }
+
+ @Test
+ public void timestampLabelWithNullAttributesThrowsError() {
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+ PubsubClient.extractTimestamp("myLabel", null, null);
+ }
+
+ @Test
+ public void timestampLabelSetWithMissingAttributeThrowsError() {
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+ Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
+ PubsubClient.extractTimestamp("myLabel", null, map);
+ }
+
+ @Test
+ public void timestampLabelParsesMillisecondsSinceEpoch() {
+ long time = 1446162101123L;
+ Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
+ long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+ assertEquals(time, timestamp);
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339Seconds() {
+ roundTripRfc339("2015-10-29T23:41:41Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339Tenths() {
+ roundTripRfc339("2015-10-29T23:41:41.1Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339Hundredths() {
+ roundTripRfc339("2015-10-29T23:41:41.12Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339Millis() {
+ roundTripRfc339("2015-10-29T23:41:41.123Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339Micros() {
+ // Note: micros part 456/1000 is dropped.
+ truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339MicrosRounding() {
+ // Note: micros part 999/1000 is dropped, not rounded up.
+ truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
+ }
+
+ @Test
+ public void timestampLabelWithInvalidFormatThrowsError() {
+ thrown.expect(NumberFormatException.class);
+ parse("not-a-timestamp");
+ }
+
+ @Test
+ public void timestampLabelWithInvalidFormat2ThrowsError() {
+ thrown.expect(NumberFormatException.class);
+ parse("null");
+ }
+
+ @Test
+ public void timestampLabelWithInvalidFormat3ThrowsError() {
+ thrown.expect(NumberFormatException.class);
+ parse("2015-10");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339WithSmallYear() {
+ // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
+ // This is therefore a "small year" until this difference is reconciled.
+ roundTripRfc339("1582-10-15T01:23:45.123Z");
+ }
+
+ @Test
+ public void timestampLabelParsesRfc3339WithLargeYear() {
+ // Year 9999 in range.
+ roundTripRfc339("9999-10-29T23:41:41.123999Z");
+ }
+
+ @Test
+ public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+ thrown.expect(NumberFormatException.class);
+ // Year 10000 out of range.
+ parse("10000-10-29T23:41:41.123999Z");
+ }
+
+ //
+ // Paths
+ //
+
+ @Test
+ public void projectPathFromIdWellFormed() {
+ ProjectPath path = PubsubClient.projectPathFromId("test");
+ assertEquals("projects/test", path.getPath());
+ }
+
+ @Test
+ public void subscriptionPathFromNameWellFormed() {
+ SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
+ assertEquals("projects/test/subscriptions/something", path.getPath());
+ assertEquals("/subscriptions/test/something", path.getV1Beta1Path());
+ }
+
+ @Test
+ public void topicPathFromNameWellFormed() {
+ TopicPath path = PubsubClient.topicPathFromName("test", "something");
+ assertEquals("projects/test/topics/something", path.getPath());
+ assertEquals("/topics/test/something", path.getV1Beta1Path());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
new file mode 100644
index 0000000..189049c
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+
+import io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubGrpcClient.
+ */
+public class PubsubGrpcClientTest {
+ private ManagedChannel mockChannel;
+ private GoogleCredentials mockCredentials;
+ private PublisherGrpc.PublisherBlockingStub mockPublisherStub;
+ private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub;
+
+ private PubsubClient client;
+
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final long REQ_TIME = 1234L;
+ private static final long PUB_TIME = 3456L;
+ private static final long MESSAGE_TIME = 6789L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final String MESSAGE_ID = "testMessageId";
+ private static final String DATA = "testData";
+ private static final String CUSTOM_ID =
+ Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+ private static final String ACK_ID = "testAckId";
+
+ @Before
+ public void setup() throws IOException {
+ mockChannel = Mockito.mock(ManagedChannel.class);
+ mockCredentials = Mockito.mock(GoogleCredentials.class);
+ mockPublisherStub =
+ Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+ mockSubscriberStub =
+ Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS);
+ client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel,
+ mockCredentials, mockPublisherStub, mockSubscriberStub);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ client.close();
+ client = null;
+ mockChannel = null;
+ mockCredentials = null;
+ mockPublisherStub = null;
+ mockSubscriberStub = null;
+ }
+
+ @Test
+ public void pullOneMessage() throws IOException {
+ String expectedSubscription = SUBSCRIPTION.getPath();
+ PullRequest expectedRequest =
+ PullRequest.newBuilder()
+ .setSubscription(expectedSubscription)
+ .setReturnImmediately(true)
+ .setMaxMessages(10)
+ .build();
+ Timestamp timestamp = Timestamp.newBuilder()
+ .setSeconds(PUB_TIME / 1000)
+ .setNanos((int) (PUB_TIME % 1000) * 1000)
+ .build();
+ PubsubMessage expectedPubsubMessage =
+ PubsubMessage.newBuilder()
+ .setMessageId(MESSAGE_ID)
+ .setData(
+ ByteString.copyFrom(DATA.getBytes()))
+ .setPublishTime(timestamp)
+ .putAllAttributes(
+ ImmutableMap.of(TIMESTAMP_LABEL,
+ String.valueOf(MESSAGE_TIME),
+ ID_LABEL, CUSTOM_ID))
+ .build();
+ ReceivedMessage expectedReceivedMessage =
+ ReceivedMessage.newBuilder()
+ .setMessage(expectedPubsubMessage)
+ .setAckId(ACK_ID)
+ .build();
+ PullResponse expectedResponse =
+ PullResponse.newBuilder()
+ .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
+ .build();
+ Mockito.when(mockSubscriberStub.pull(expectedRequest))
+ .thenReturn(expectedResponse);
+ List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
+ assertEquals(1, acutalMessages.size());
+ IncomingMessage actualMessage = acutalMessages.get(0);
+ assertEquals(ACK_ID, actualMessage.ackId);
+ assertEquals(DATA, new String(actualMessage.elementBytes));
+ assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+ assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+ assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+ }
+
+ @Test
+ public void publishOneMessage() throws IOException {
+ String expectedTopic = TOPIC.getPath();
+ PubsubMessage expectedPubsubMessage =
+ PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(DATA.getBytes()))
+ .putAllAttributes(
+ ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+ ID_LABEL, CUSTOM_ID))
+ .build();
+ PublishRequest expectedRequest =
+ PublishRequest.newBuilder()
+ .setTopic(expectedTopic)
+ .addAllMessages(
+ ImmutableList.of(expectedPubsubMessage))
+ .build();
+ PublishResponse expectedResponse =
+ PublishResponse.newBuilder()
+ .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
+ .build();
+ Mockito.when(mockPublisherStub.publish(expectedRequest))
+ .thenReturn(expectedResponse);
+ OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+ int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+ assertEquals(1, n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
new file mode 100644
index 0000000..7d8513b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubTestClient.
+ */
+public class PubsubTestClientTest {
+ private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final long REQ_TIME = 1234L;
+ private static final long MESSAGE_TIME = 6789L;
+ private static final String MESSAGE_ID = "testMessageId";
+ private static final String DATA = "testData";
+ private static final String ACK_ID = "testAckId";
+ private static final int ACK_TIMEOUT_S = 60;
+
+ @Test
+ public void pullOneMessage() throws IOException {
+ IncomingMessage expectedIncomingMessage =
+ new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
+ try (PubsubTestClient client =
+ new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null,
+ Lists.newArrayList(expectedIncomingMessage))) {
+ long now = REQ_TIME;
+ client.advanceTo(now);
+ List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+ // Timeout on ACK.
+ now += (ACK_TIMEOUT_S + 10) * 1000;
+ client.advanceTo(now);
+ incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+ now += 10 * 1000;
+ client.advanceTo(now);
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Timeout on extended ACK
+ now += 30 * 1000;
+ client.advanceTo(now);
+ incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Ack
+ now += 15 * 1000;
+ client.advanceTo(now);
+ client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+ }
+ }
+
+ @Test
+ public void publishOneMessage() throws IOException {
+ OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+ try (PubsubTestClient client =
+ new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S,
+ Sets.newHashSet(expectedOutgoingMessage), null)) {
+ client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+ }
+ }
+}