You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/11/15 23:44:43 UTC
[1/2] beam git commit: Fixed adding timestamp and id attributes to
pubsub messages
Repository: beam
Updated Branches:
refs/heads/master 30886acee -> c3a96bf3d
Fixed adding timestamp and id attributes to pubsub messages
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f77a8580
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f77a8580
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f77a8580
Branch: refs/heads/master
Commit: f77a8580700b1cd79e0c824f171cccd8e2f332b3
Parents: 30886ac
Author: Nigel Kilmer <nk...@google.com>
Authored: Mon Jul 17 18:09:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Nov 15 15:44:23 2017 -0800
----------------------------------------------------------------------
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 34 ++++++-------
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++
2 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index b745422..ab08813 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -135,23 +135,7 @@ class PubsubJsonClient extends PubsubClient {
List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
for (OutgoingMessage outgoingMessage : outgoingMessages) {
PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
- Map<String, String> attributes = outgoingMessage.attributes;
- if ((timestampAttribute != null || idAttribute != null) && attributes == null) {
- attributes = new TreeMap<>();
- }
- if (attributes != null) {
- pubsubMessage.setAttributes(attributes);
- }
-
- if (timestampAttribute != null) {
- attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
- }
-
- if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
- attributes.put(idAttribute, outgoingMessage.recordId);
- }
-
+ pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
pubsubMessages.add(pubsubMessage);
}
PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -162,6 +146,22 @@ class PubsubJsonClient extends PubsubClient {
return response.getMessageIds().size();
}
+ private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) {
+ Map<String, String> attributes = null;
+ if (outgoingMessage.attributes == null) {
+ attributes = new TreeMap<>();
+ } else {
+ attributes = new TreeMap<>(outgoingMessage.attributes);
+ }
+ if (timestampAttribute != null) {
+ attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+ if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idAttribute, outgoingMessage.recordId);
+ }
+ return attributes;
+ }
+
@Override
public List<IncomingMessage> pull(
long requestTimeMsSinceEpoch,
http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 578f814..cbb24f2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -136,4 +136,57 @@ public class PubsubJsonClientTest {
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
+
+ @Test
+ public void publishOneMessageWithOnlyTimestampAndIdAttributes() throws IOException {
+ String expectedTopic = TOPIC.getPath();
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .encodeData(DATA.getBytes())
+ .setAttributes(
+ ImmutableMap.<String, String> builder()
+ .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+ .put(ID_ATTRIBUTE, RECORD_ID).build());
+ PublishRequest expectedRequest = new PublishRequest()
+ .setMessages(ImmutableList.of(expectedPubsubMessage));
+ PublishResponse expectedResponse = new PublishResponse()
+ .setMessageIds(ImmutableList.of(MESSAGE_ID));
+ Mockito.when((Object) (mockPubsub.projects()
+ .topics()
+ .publish(expectedTopic, expectedRequest)
+ .execute()))
+ .thenReturn(expectedResponse);
+ OutgoingMessage actualMessage = new OutgoingMessage(
+ DATA.getBytes(), ImmutableMap.<String, String>of(), MESSAGE_TIME, RECORD_ID);
+ int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+ assertEquals(1, n);
+ }
+
+ @Test
+ public void publishOneMessageWithNoTimestampOrIdAttribute() throws IOException {
+ // For this test, create a new PubsubJsonClient without the timestamp attribute
+ // or id attribute set.
+ client = new PubsubJsonClient(null, null, mockPubsub);
+
+ String expectedTopic = TOPIC.getPath();
+ PubsubMessage expectedPubsubMessage = new PubsubMessage()
+ .encodeData(DATA.getBytes())
+ .setAttributes(
+ ImmutableMap.<String, String> builder()
+ .put("k", "v").build());
+ PublishRequest expectedRequest = new PublishRequest()
+ .setMessages(ImmutableList.of(expectedPubsubMessage));
+ PublishResponse expectedResponse = new PublishResponse()
+ .setMessageIds(ImmutableList.of(MESSAGE_ID));
+ Mockito.when((Object) (mockPubsub.projects()
+ .topics()
+ .publish(expectedTopic, expectedRequest)
+ .execute()))
+ .thenReturn(expectedResponse);
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("k", "v");
+ OutgoingMessage actualMessage = new OutgoingMessage(
+ DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
+ int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+ assertEquals(1, n);
+ }
}
[2/2] beam git commit: This closes #3581
Posted by al...@apache.org.
This closes #3581
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3a96bf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3a96bf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3a96bf3
Branch: refs/heads/master
Commit: c3a96bf3d9d40f7636d8ac62326aa75e93c0b11c
Parents: 30886ac f77a858
Author: Ahmet Altay <al...@google.com>
Authored: Wed Nov 15 15:44:32 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Nov 15 15:44:32 2017 -0800
----------------------------------------------------------------------
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 34 ++++++-------
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++
2 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------