You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:42:38 UTC

[GitHub] [beam] damccorm opened a new issue, #21162: Add ability to Write to GCP PubSub with an orderingKey

damccorm opened a new issue, #21162:
URL: https://github.com/apache/beam/issues/21162

   There was a prior email dev thread about PubSub ordering key[1] but that was in reference to reads. It should be possible to support pubsub Write with ordering key. We might be able to have users supply a serializable function to extra the ordering key from each message, for example?
   
   
    
   
    [1] [http://mail-archives.apache.org/mod_mbox/beam-dev/202010.mbox/%3CCAGAbUe93TcwnC+fyAkE3MmtNANi-oFUXPQbg-gDyP6oB8eLTkA@mail.gmail.com%3E](http://mail-archives.apache.org/mod_mbox/beam-dev/202010.mbox/%3CCAGAbUe93TcwnC+fyAkE3MmtNANi-oFUXPQbg-gDyP6oB8eLTkA@mail.gmail.com%3E)
   
   Imported from Jira [BEAM-13148](https://issues.apache.org/jira/browse/BEAM-13148). Original Jira may contain additional context.
   Reported by: egalpin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370128144

   Tried above and this:
   ```java
   cr.registerCoderForType(TypeDescriptor.of(PubsubMessage.class), new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder());
   ```
   but nothing changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369333569

   Yes publishing messages with an ordering key is now supported in the Java SDK as well, added by https://github.com/apache/beam/pull/22216. I believe this should be included in SDK v2.43.0 and higher. 
   
   Using ordering key is supported by employing PubsubIO.writeMessages[1], where messages will need to be instantiated using the appropriate constructor[2] that accepts ordering keys. It’s also worth noting that pubsub write can make use of regional endpoints[3], which might be required for your use case to fully ensure ordering. 
   
   [1] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html#writeMessages--
   [2] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.html
   [3] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Write.html#withPubsubRootUrl-java.lang.String-


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370134940

   Thanks for confirming.  I haven't yet had time to trace/validate.  One other option you could try depending on your level of comfort would be to modify the beam source and vendor in the pubsubIO module.  I noted my steps for doing so, specifically with dataflow, previously:  https://lists.apache.org/thread/c6929ms0bjxtcw9ho4tdb5y3t8wnwnfy.  This is not a long-term solution, but rather would be a stopgap solution until the same change can be made in the official SDK as per https://github.com/apache/beam/issues/23525.  Given that setting the coder on the PCollection appears to be insufficient, the priority of https://github.com/apache/beam/issues/23525 is increased in my opinion.
   
   You'd want to modify the PubsubCoderProviderRegistrar[1] such that the `PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder` appears first in the list (or as the sole list element), then follow the vendoring steps mentioned.
   
   [1] https://github.com/apache/beam/blob/9a22bf68767b8fa27aa381d6ff84e381a21af5a4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370145645

   It is unbounded but I was using the test pipeline for the last few examples I tried here. I didn't attempt to run in dataflow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1371259467

   thanks @codertimu , I can reproduce the issue of lost orderingKey locally as well using something like this:
   
   ```java
           pipeline.apply(
               Create.of(
                   new PubsubMessage(
                       "{\"baz\": \"jazz\"}".getBytes(StandardCharsets.UTF_8),
                       Collections.singletonMap("foo", "bar"),
                       null,
                       "1")))
           .setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of());
           
           ...
   ```
   
   Attaching a debugger and stepping through, I can see that the Coder in `Create` is  `PubsubMessageWithAttributesCoder` (the first element in the [PubsubCoderProviderRegistrar](https://github.com/apache/beam/blob/9a22bf68767b8fa27aa381d6ff84e381a21af5a4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java)).
   
   This obviously needs to be fixed, and I believe will be fixed by altering the registrar to use `PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder` as default (i.e. fixing https://github.com/apache/beam/issues/23525), but if you're able to could you verify if this behaviour persists with any runner which is not the test runner?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1368959035

   Any updates on this? Python api has this but Java api lacks it. Can't it be prioritised?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369940644

   Yes good call out, this is exactly the issue of https://github.com/apache/beam/issues/23525. You can safely and properly circumvent the issue by using `setCoder` on the PCollection of pubsub messages which have ordering keys, specifying `PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder` as the coder 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370739409

   @egalpin just to clarify, in my test pipeline, PubsubIO is not involved at all. Even in simple ptransform, the orderingkey and messageid are not preserved in the pcollection. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369943303

   Note that, as far as I understand, any messageId set on a pubsub message to be published will be dropped before being published. This is because message IDs are assigned on the server side of pubsub (again, this is my own understanding and may not be factual)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
damccorm commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369250938

   @egalpin may have context


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370113175

   This is indeed correct usage, and I was under the impression that this should work. Based on what you're saying/seeing, I'm suspicious that the coder might not be propagated through the PubsubIO/write code path.  I'll try to trace the code to confirm and/or validate behaviour as well.
   
   @codertimu you could try to set the default coder for the pubsub message class[1] to see if that will unblock you: 
   
   ```java
   PipelineOptions options = PipelineOptionsFactory.create();
   Pipeline p = Pipeline.create(options);
   
   CoderRegistry cr = p.getCoderRegistry();
   cr.registerCoder(PubsubMessage.class, PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.class);
   ```
   
   [1] https://beam.apache.org/documentation/programming-guide/#setting-default-coder


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1371316282

   I confirm that the issue persists with the dataflow runner as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370136035

   @codertimu can you confirm whether your pipeline is bounded or unbounded? It may help me trace


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369559192

   Interesting, I used 2.43 with the appropriate constructor that takes messageId and orderingKey but no messageId and orderingKey was in Pubsub. I am using dataflow runner.
   
   Could it be related to #23525 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1369982435

   I used `PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder` on the PCollection of Pubsub messages but still `messageId`(I don't care it) and `orderingKey` is null. 
   
   I used it as follows:
   ```java
   results.setCoder(new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder())
                   .apply("Write Result updates to PubSub", PubsubIO.writeMessages().to(options.getLitResultTopic()));
   ```
   Am I not using it correctly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codertimu commented on issue #21162: Add ability to Write to GCP PubSub with an orderingKey

Posted by GitBox <gi...@apache.org>.
codertimu commented on issue #21162:
URL: https://github.com/apache/beam/issues/21162#issuecomment-1370120655

   @egalpin registerCoder does not exist. Do you mean like this:
   ```java
   cr.registerCoderForClass(PubsubMessage.class, new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder());
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org