You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:00:09 UTC

[GitHub] [beam] kennknowles opened a new issue, #19013: Reduce Pub/Sub publishing latency

kennknowles opened a new issue, #19013:
URL: https://github.com/apache/beam/issues/19013

   The current implementation of the `PubsubUnboundedSink` uses a global window with a trigger on a fixed batch size of 1000 elements or a processing timespan of 2 seconds. After that, a random sharding of 100 is applied via a `GroupByKey` transform. The result is then pushed into a `DoFn` which performs the actual publishing step. 
   
   In case of low-latency (10s or 100s of milliseconds), this logic is quite bad, because it leads to a latency of  around 1.2 seconds, introduced by the transform steps described above.
   
   There are several possibilities to improve the Pub/Sub sink, for example:
   
   Let the upper parameters be configured via `PipelineOptions:`
    * `pubsubBatchSize`: Approx. maximum number of elements in a Pub/Sub publishing batch
    * `pubsubDelayThreshold`: Max. processing time duration before firing the sharding window
    * `pubsubShardCount`: The number of shards to create before publishing
   
   This would allow tweaking of the Pub/Sub sink for different scenarious of throughput and message size in the pipeline.
   
   However, if the throughput is small (< 100 element/s), this mechanism is still quite slow. If we take a look at the Java client at `com.google.cloud:google-cloud-pubsub`, the `Publisher` class supports a wide range of options to optimize its batching behaviour. This would allow not to rely on a window with group by key functionality and let the publisher itself handle the batching.
   
   Consider the following `DoFn` for publishing messages to Pub/Sub using that client:
   ```
   
   class PublishFn extends DoFn<PubsubMessage, Void> {
       private transient Publisher publisher;
   
   
      private final ValueProvider<String> topicPath;
   
       public PublishFn(final ValueProvider<String>
   topicPath) {
           this.topicPath = topicPath;
       }
   
       @Setup
       public void setup() throws
   IOException {
           publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
       
              .setBatchingSettings(BatchingSettings.newBuilder()
                           .setRequestByteThreshold(40000L)
   
                          .setElementCountThreshold(1000L)
                           .setDelayThreshold(Duration.ofMillis(50))
   
                          .build())
                   .build();
       }
   
       @ProcessElement
       public
   void processElement(final ProcessContext context) {
           publisher.publish(context.element());
   
      }
   
       @Teardown
       public void teardown() throws Exception {
           publisher.shutdown();
   
      }
   
       @Override
       public void populateDisplayData(final DisplayData.Builder builder) {
     
        builder.add(DisplayData.item("topic", topicPath));
       }
   }
   
   ```
   
   In small test, this resulted in a publish latency of around 50 – 70 ms instead of 1000 – 1200 with the original `PubsubUnboundedSink`.
   
   I can understand, that the windowing mechanism could lead to better performance and throughput in a scenario with a high number of elements per second. However, it would be nice to enable a "low-latency-mode" using the provided code as an example.
   
   Imported from Jira [BEAM-4829](https://issues.apache.org/jira/browse/BEAM-4829). Original Jira may contain additional context.
   Reported by: JanHicken.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org