You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Jan Hicken (JIRA)" <ji...@apache.org> on 2018/07/19 15:33:00 UTC
[jira] [Created] (BEAM-4829) Reduce Pub/Sub publishing latency
Jan Hicken created BEAM-4829:
--------------------------------
Summary: Reduce Pub/Sub publishing latency
Key: BEAM-4829
URL: https://issues.apache.org/jira/browse/BEAM-4829
Project: Beam
Issue Type: Improvement
Components: io-java-gcp
Affects Versions: 2.5.0
Reporter: Jan Hicken
Assignee: Chamikara Jayalath
The current implementation of the {{PubsubUnboundedSink}} uses a global window with a trigger on a fixed batch size of 1000 elements or a processing timespan of 2 seconds. After that, a random sharding of 100 is applied via a {{GroupByKey}} transform. The result is then pushed into a {{DoFn}} which performs the actual publishing step.
In case of low-latency (10s or 100s of milliseconds), this logic is quite bad, because it leads to a latency of around 1.2 seconds, introduced by the transform steps described above.
There are several possibilities to improve the Pub/Sub sink, for example:
Let the upper parameters be configured via {{PipelineOptions:}}
* {{pubsubBatchSize}}: Approx. maximum number of elements in a Pub/Sub publishing batch
* {{pubsubDelayThreshold}}: Max. processing time duration before firing the sharding window
* {{pubsubShardCount}}: The number of shards to create before publishing
This would allow tweaking of the Pub/Sub sink for different scenarious of throughput and message size in the pipeline.
However, if the throughput is small (< 100 element/s), this mechanism is still quite slow. If we take a look at the Java client at {{com.google.cloud:google-cloud-pubsub}}, the {{Publisher}} class supports a wide range of options to optimize its batching behaviour. This would allow not to rely on a window with group by key functionality and let the publisher itself handle the batching.
Consider the following {{DoFn}} for publishing messages to Pub/Sub using that client:
{code:java}
class PublishFn extends DoFn<PubsubMessage, Void> {
private transient Publisher publisher;
private final ValueProvider<String> topicPath;
public PublishFn(final ValueProvider<String> topicPath) {
this.topicPath = topicPath;
}
@Setup
public void setup() throws IOException {
publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
.setBatchingSettings(BatchingSettings.newBuilder()
.setRequestByteThreshold(40000L)
.setElementCountThreshold(1000L)
.setDelayThreshold(Duration.ofMillis(50))
.build())
.build();
}
@ProcessElement
public void processElement(final ProcessContext context) {
publisher.publish(context.element());
}
@Teardown
public void teardown() throws Exception {
publisher.shutdown();
}
@Override
public void populateDisplayData(final DisplayData.Builder builder) {
builder.add(DisplayData.item("topic", topicPath));
}
}
{code}
In small test, this resulted in a publish latency of around 50 – 70 ms instead of 1000 – 1200 with the original {{PubsubUnboundedSink}}.
I can understand, that the windowing mechanism could lead to better performance and throughput in a scenario with a high number of elements per second. However, it would be nice to enable a "low-latency-mode" using the provided code as an example.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)