You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/10 13:19:51 UTC
[GitHub] lovelle opened a new pull request #3155: Deferred messages for
consumers
lovelle opened a new pull request #3155: Deferred messages for consumers
URL: https://github.com/apache/pulsar/pull/3155
### Motivation
Normally messages are received by the consumer as soon as they were published by
producer. This feature offers the capability to configure consumer subscription
with an arbitrary receiver delay.
Receiver delay means that consumers will only receive messages that are older
than this parameter plus publish time as well. Messages that are not ready to be
delivered are schedule for deferred delivery.
This is helpful on systems relaying on the producer/consumer pattern used for
synchronisation or healthy checks, but on such systems is common to have some
overhead committing data on persistent storage maybe due to buffered mechanism
or distributing the data across the network before being available,
e.g. Elasticsearch
With this feature users will be able to do these kind of work with ease and
without doing any ugly hack like using re-delivery and testing until any
condition is met after some time.
### Modifications
pulsar-broker
- Add field for DelayQueue to store next positions pending of delivery.
- Add field for ScheduledFuture to schedule the head of DelayQueue in order to
start the process of delivering expired entries.
- Fix on sendMessages method usage of received list of entries when such
entries are being filtered by updatePermitsAndPendingAcks method.
Received list of entries might be filtered by updatePermitsAndPendingAcks
method and afterwards entries are processed by execute() with lambda which
is probably that it's executing thread will be other than its calling thread.
Therefore entries are now wrapped with CopyOnWriteArrayList in order
to prevent ConcurrentModificationException by lambda on execute(), another
approach could be to copy the entire list for lambda or to use a synchronized
list, but this would result in a performance penalty even when entries are
not being filtered, CopyOnWriteArrayList prevent this from happening.
Another step further trying to fix this might be using Streams and applying
transformations to inner list. This path was not taken because would
require major changes.
- Add processDelayEntries() method to process all elements added in DelayQueue
which are ready to be delivered, at any given time just one task should be
schedule using this method.
- Add readPublishFrom() method to get the parameter of publish time from
metadata of a message without changing its reference offset, this method
will only be used if the consumer has enabled the receiver delay parameter
on the subscription.
- Add inner private class DelayPositionInfo to represent each position to be
schedule in DelayQueue.
- Add method to clean-up previous mentioned fields related to deferred
messages.
DelayQueue from java.util.concurrent is used in order to store each messages
position next to be expired, the advantage of using this queue is that at any
given time one and only one task is scheduled per consumer avoiding to schedule
an unbounded number of tasks.
pulsar-client
- Add receiverDelay() method at subscription level to configure this parameter.
pulsar-common
- Set receiver delay whether it was configured by user on subscription.
- Add optional receiver delay parameter to protobuf pulsar schema,
code generated by generate_protobuf_docker.sh script.
### Result
For those consumers that have been configured with a receiver delay, broker
service will be able to deliver only messages that at some given moment in time
are ready to deliver.
Ready to delivery means: `now() < publishTime() + receiverDelay`
### Tests
I've just tested this feature with an external application consuming messages
at different receiver delay rates.
If this feature is accepted I will be pleased to add all needed tests and also
to add receiver delay on all supported languages.
### Future considerations
An improvement could be made with the way each position is stored on DelayQueue,
instead of storing the position of each message, when messages are being
receiving sequentially (this should be the common case) a range of positions
could be stored on a single element on DelayQueue saving memory usage.
This improvement was not made in order to leave this feature as simple as
possible, if this feature is accepted I would like to do this if pulsar
community agrees.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services