You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Jan Hicken (JIRA)" <ji...@apache.org> on 2018/07/19 15:33:00 UTC

[jira] [Created] (BEAM-4829) Reduce Pub/Sub publishing latency

Jan Hicken created BEAM-4829:
--------------------------------

             Summary: Reduce Pub/Sub publishing latency
                 Key: BEAM-4829
                 URL: https://issues.apache.org/jira/browse/BEAM-4829
             Project: Beam
          Issue Type: Improvement
          Components: io-java-gcp
    Affects Versions: 2.5.0
            Reporter: Jan Hicken
            Assignee: Chamikara Jayalath


The current implementation of the {{PubsubUnboundedSink}} uses a global window with a trigger on a fixed batch size of 1000 elements or a processing timespan of 2 seconds. After that, a random sharding of 100 is applied via a {{GroupByKey}} transform. The result is then pushed into a {{DoFn}} which performs the actual publishing step. 

In case of low-latency (10s or 100s of milliseconds), this logic is quite bad, because it leads to a latency of  around 1.2 seconds, introduced by the transform steps described above.

There are several possibilities to improve the Pub/Sub sink, for example:

Let the upper parameters be configured via {{PipelineOptions:}}
 * {{pubsubBatchSize}}: Approx. maximum number of elements in a Pub/Sub publishing batch
 * {{pubsubDelayThreshold}}: Max. processing time duration before firing the sharding window
 * {{pubsubShardCount}}: The number of shards to create before publishing

This would allow tweaking of the Pub/Sub sink for different scenarious of throughput and message size in the pipeline.

However, if the throughput is small (< 100 element/s), this mechanism is still quite slow. If we take a look at the Java client at {{com.google.cloud:google-cloud-pubsub}}, the {{Publisher}} class supports a wide range of options to optimize its batching behaviour. This would allow not to rely on a window with group by key functionality and let the publisher itself handle the batching.

Consider the following {{DoFn}} for publishing messages to Pub/Sub using that client:
{code:java}
class PublishFn extends DoFn<PubsubMessage, Void> {
    private transient Publisher publisher;

    private final ValueProvider<String> topicPath;

    public PublishFn(final ValueProvider<String> topicPath) {
        this.topicPath = topicPath;
    }

    @Setup
    public void setup() throws IOException {
        publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
                .setBatchingSettings(BatchingSettings.newBuilder()
                        .setRequestByteThreshold(40000L)
                        .setElementCountThreshold(1000L)
                        .setDelayThreshold(Duration.ofMillis(50))
                        .build())
                .build();
    }

    @ProcessElement
    public void processElement(final ProcessContext context) {
        publisher.publish(context.element());
    }

    @Teardown
    public void teardown() throws Exception {
        publisher.shutdown();
    }

    @Override
    public void populateDisplayData(final DisplayData.Builder builder) {
        builder.add(DisplayData.item("topic", topicPath));
    }
}
{code}
In small test, this resulted in a publish latency of around 50 – 70 ms instead of 1000 – 1200 with the original {{PubsubUnboundedSink}}.

I can understand, that the windowing mechanism could lead to better performance and throughput in a scenario with a high number of elements per second. However, it would be nice to enable a "low-latency-mode" using the provided code as an example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)