You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/10 13:19:51 UTC

[GitHub] lovelle opened a new pull request #3155: Deferred messages for consumers

lovelle opened a new pull request #3155: Deferred messages for consumers
URL: https://github.com/apache/pulsar/pull/3155
 
 
   ### Motivation
   
   Normally messages are received by the consumer as soon as they were published by
   producer. This feature offers the capability to configure consumer subscription
   with an arbitrary receiver delay.
   Receiver delay means that consumers will only receive messages that are older
   than this parameter plus publish time as well. Messages that are not ready to be
   delivered are schedule for deferred delivery.
   
   This is helpful on systems relaying on the producer/consumer pattern used for
   synchronisation or healthy checks, but on such systems is common to have some
   overhead committing data on persistent storage maybe due to buffered mechanism
   or distributing the data across the network before being available,
   e.g. Elasticsearch
   
   With this feature users will be able to do these kind of work with ease and
   without doing any ugly hack like using re-delivery and testing until any
   condition is met after some time.
   
   ### Modifications
   
   pulsar-broker
     - Add field for DelayQueue to store next positions pending of delivery.
   
     - Add field for ScheduledFuture to schedule the head of DelayQueue in order to
       start the process of delivering expired entries.
   
     - Fix on sendMessages method usage of received list of entries when such
       entries are being filtered by updatePermitsAndPendingAcks method.
       Received list of entries might be filtered by updatePermitsAndPendingAcks
       method and afterwards entries are processed by execute() with lambda which
       is probably that it's executing thread will be other than its calling thread.
   
       Therefore entries are now wrapped with CopyOnWriteArrayList in order
       to prevent ConcurrentModificationException by lambda on execute(), another
       approach could be to copy the entire list for lambda or to use a synchronized
       list, but this would result in a performance penalty even when entries are
       not being filtered, CopyOnWriteArrayList prevent this from happening.
   
       Another step further trying to fix this might be using Streams and applying
       transformations to inner list. This path was not taken because would
       require major changes.
   
     - Add processDelayEntries() method to process all elements added in DelayQueue
       which are ready to be delivered, at any given time just one task should be
       schedule using this method.
   
     - Add readPublishFrom() method to get the parameter of publish time from
       metadata  of a message without changing its reference offset, this method
       will only be used if the consumer has enabled the receiver delay parameter
       on the subscription.
   
     - Add inner private class DelayPositionInfo to represent each position to be
       schedule in DelayQueue.
   
     - Add method to clean-up previous mentioned fields related to deferred
       messages.
   
   DelayQueue from java.util.concurrent is used in order to store each messages
   position next to be expired, the advantage of using this queue is that at any
   given time one and only one task is scheduled per consumer avoiding to schedule
   an unbounded number of tasks.
   
   pulsar-client
     - Add receiverDelay() method at subscription level to configure this parameter.
   
   pulsar-common
     - Set receiver delay whether it was configured by user on subscription.
   
     - Add optional receiver delay parameter to protobuf pulsar schema,
       code generated by generate_protobuf_docker.sh script.
   
   ### Result
   
   For those consumers that have been configured with a receiver delay, broker
   service will be able to deliver only messages that at some given moment in time
   are ready to deliver.
   Ready to delivery means: `now() < publishTime() + receiverDelay`
   
   ### Tests
   
   I've just tested this feature with an external application consuming messages
   at different receiver delay rates.
   
   If this feature is accepted I will be pleased to add all needed tests and also
   to add receiver delay on all supported languages.
   
   ### Future considerations
   
   An improvement could be made with the way each position is stored on DelayQueue,
   instead of storing the position of each message, when messages are being
   receiving sequentially (this should be the common case) a range of positions
   could be stored on a single element on DelayQueue saving memory usage.
   
   This improvement was not made in order to leave this feature as simple as
   possible, if this feature is accepted I would like to do this if pulsar
   community agrees.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services