You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/11/15 23:44:43 UTC

[1/2] beam git commit: Fixed adding timestamp and id attributes to pubsub messages

Repository: beam
Updated Branches:
  refs/heads/master 30886acee -> c3a96bf3d


Fixed adding timestamp and id attributes to pubsub messages


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f77a8580
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f77a8580
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f77a8580

Branch: refs/heads/master
Commit: f77a8580700b1cd79e0c824f171cccd8e2f332b3
Parents: 30886ac
Author: Nigel Kilmer <nk...@google.com>
Authored: Mon Jul 17 18:09:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Nov 15 15:44:23 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     | 34 ++++++-------
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index b745422..ab08813 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -135,23 +135,7 @@ class PubsubJsonClient extends PubsubClient {
     List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
       PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
-      Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampAttribute != null || idAttribute != null) && attributes == null) {
-        attributes = new TreeMap<>();
-      }
-      if (attributes != null) {
-        pubsubMessage.setAttributes(attributes);
-      }
-
-      if (timestampAttribute != null) {
-        attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        attributes.put(idAttribute, outgoingMessage.recordId);
-      }
-
+      pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
       pubsubMessages.add(pubsubMessage);
     }
     PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -162,6 +146,22 @@ class PubsubJsonClient extends PubsubClient {
     return response.getMessageIds().size();
   }
 
+  private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) {
+    Map<String, String> attributes = null;
+    if (outgoingMessage.attributes == null) {
+      attributes = new TreeMap<>();
+    } else {
+      attributes = new TreeMap<>(outgoingMessage.attributes);
+    }
+    if (timestampAttribute != null) {
+      attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+    }
+    if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+      attributes.put(idAttribute, outgoingMessage.recordId);
+    }
+    return attributes;
+  }
+
   @Override
   public List<IncomingMessage> pull(
       long requestTimeMsSinceEpoch,

http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 578f814..cbb24f2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -136,4 +136,57 @@ public class PubsubJsonClientTest {
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
+
+  @Test
+  public void publishOneMessageWithOnlyTimestampAndIdAttributes() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.<String, String> builder()
+                    .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+                    .put(ID_ATTRIBUTE, RECORD_ID).build());
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(
+        DATA.getBytes(), ImmutableMap.<String, String>of(), MESSAGE_TIME, RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+
+  @Test
+  public void publishOneMessageWithNoTimestampOrIdAttribute() throws IOException {
+    // For this test, create a new PubsubJsonClient without the timestamp attribute
+    // or id attribute set.
+    client = new PubsubJsonClient(null, null, mockPubsub);
+
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.<String, String> builder()
+                    .put("k", "v").build());
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
+           .thenReturn(expectedResponse);
+    Map<String, String> attrs = new HashMap<>();
+    attrs.put("k", "v");
+    OutgoingMessage actualMessage = new OutgoingMessage(
+            DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
 }


[2/2] beam git commit: This closes #3581

Posted by al...@apache.org.
This closes #3581


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3a96bf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3a96bf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3a96bf3

Branch: refs/heads/master
Commit: c3a96bf3d9d40f7636d8ac62326aa75e93c0b11c
Parents: 30886ac f77a858
Author: Ahmet Altay <al...@google.com>
Authored: Wed Nov 15 15:44:32 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Nov 15 15:44:32 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     | 34 ++++++-------
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------