You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/31 03:16:11 UTC
[1/2] beam git commit: Fix PubSubIO write attribute issue
Repository: beam
Updated Branches:
refs/heads/master b1c287bd5 -> 1e2ad65f8
Fix PubSubIO write attribute issue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad9df5b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad9df5b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad9df5b5
Branch: refs/heads/master
Commit: ad9df5b5591ce9d153039ac91e8862af6ea42b45
Parents: b1c287b
Author: Chen Bin <bc...@talend.com>
Authored: Thu Mar 9 11:09:04 2017 +0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Mar 30 20:15:58 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/PubsubJsonClient.java | 2 +-
.../org/apache/beam/sdk/util/PubsubJsonClientTest.java | 13 ++++++++++---
2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
index 6bc104f..ef8abfd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
@@ -135,7 +135,7 @@ public class PubsubJsonClient extends PubsubClient {
for (OutgoingMessage outgoingMessage : outgoingMessages) {
PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
- Map<String, String> attributes = pubsubMessage.getAttributes();
+ Map<String, String> attributes = outgoingMessage.attributes;
if ((timestampLabel != null || idLabel != null) && attributes == null) {
attributes = new TreeMap<>();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
index 17e1870..019190b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -30,7 +30,10 @@ import com.google.api.services.pubsub.model.ReceivedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
@@ -114,8 +117,10 @@ public class PubsubJsonClientTest {
PubsubMessage expectedPubsubMessage = new PubsubMessage()
.encodeData(DATA.getBytes())
.setAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID));
+ ImmutableMap.<String, String> builder()
+ .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
+ .put(ID_LABEL, RECORD_ID)
+ .put("k", "v").build());
PublishRequest expectedRequest = new PublishRequest()
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
@@ -125,8 +130,10 @@ public class PubsubJsonClientTest {
.publish(expectedTopic, expectedRequest)
.execute()))
.thenReturn(expectedResponse);
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("k", "v");
OutgoingMessage actualMessage = new OutgoingMessage(
- DATA.getBytes(), null, MESSAGE_TIME, RECORD_ID);
+ DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
[2/2] beam git commit: This closes #2209
Posted by dh...@apache.org.
This closes #2209
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e2ad65f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e2ad65f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e2ad65f
Branch: refs/heads/master
Commit: 1e2ad65f80df4d21e25df51c53414054799d803e
Parents: b1c287b ad9df5b
Author: Dan Halperin <dh...@google.com>
Authored: Thu Mar 30 20:16:01 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Mar 30 20:16:01 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/PubsubJsonClient.java | 2 +-
.../org/apache/beam/sdk/util/PubsubJsonClientTest.java | 13 ++++++++++---
2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------