You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Christopher Reilly (JIRA)" <ji...@apache.org> on 2017/04/28 15:19:04 UTC

[jira] [Created] (BEAM-2116) PubsubJsonClient doesn't write user created attributeMap

Christopher Reilly created BEAM-2116:
----------------------------------------

             Summary: PubsubJsonClient doesn't write user created attributeMap
                 Key: BEAM-2116
                 URL: https://issues.apache.org/jira/browse/BEAM-2116
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-gcp
    Affects Versions: 0.6.0
         Environment: Java Google Dataflow
            Reporter: Christopher Reilly
            Assignee: Daniel Halperin
            Priority: Minor


PubsubJsonClient, which seems to be the hard coded client for PubsubIO.write() doesn't seem to be respecting the attributes set by the user for the PubsubMessage. 

In the PubsubJsonClient.publish() method, the passed in OutgoingMessage that contains the user set attribute map never actually has it's attributes map read. Instead, a new PubsubMessage is instantiated and the empty attributesMap from that is used. This is fixed in the PubsubGrpcClient, but that client type is never used by default in any way. 


public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
      throws IOException {
    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
    for (OutgoingMessage outgoingMessage : outgoingMessages) {
      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);

      Map<String, String> attributes = pubsubMessage.getAttributes();
      if ((timestampLabel != null || idLabel != null) && attributes == null) {
        attributes = new TreeMap<>();
      }
      if (attributes != null) {
        pubsubMessage.setAttributes(attributes);
      }

Please let me know if I am going down the wrong path here.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)