You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by jaikit <jk...@gmail.com> on 2013/03/16 06:04:41 UTC

Random slow Subscribers - causing Topic to full - Solution ?

I currently have a Topic and there are multiple clients which are subscribed
to my Topic. I am running with default prefetch for each client. Now the
problem is if one of the client is slow, it slows down other subscribers. I
am thinking to play by setting low prefetch for slow consumer. But this
solution does not work because in my case slow clients are random and they
keep changing over a period of time (based on the backend they are
communicating). Hence I cannot keep low prefetch for one of the client. 

Solution which I am thinking to prototype below solution: 
I will be keeping queues for each subscriber and now Given a Topic - pool of
threads will remove events from Topic and copy the events to my queue. Now
since I have queue for each subscriber, each client is independent of each
other. I will set prefetch limit to each queue. Once that limit is reached,
I will drop the events. Drawback: Memory is required for each queue now.

I would like some views about above solution or any other solution which you
think might fit for my case.

Thanks



--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:Re: Re:Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by SuoNayi <su...@163.com>.
You said "But If one my consumer takes a message and never acks - 
than messages are expired for both fast and slow consumers."
This can only happen when these consumers are all subscribing to the same queue...



At 2013-03-20 13:17:38,jaikit <jk...@gmail.com> wrote:
>No both are are subscribed to individual queues. I have CompositeDestination
>set up ..where messages are forwarded to both the queues. 
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664924.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Re:Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
No both are are subscribed to individual queues. I have CompositeDestination
set up ..where messages are forwarded to both the queues. 



--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664924.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by SuoNayi <su...@163.com>.
I suppose your slow and fast consumers are both subscribing to the same queue.
Note that a message can only be dispatched to one of consumers.
If the slow consumer receives the messages but never acks then the fast consumers
have no chance to process it anymore.



At 2013-03-20 11:27:17,jaikit <jk...@gmail.com> wrote:
>I found an old post with same problem (non deterministic behaviour of
>collecting expired messages)
>https://issues.apache.org/jira/browse/AMQ-1112 
>
>I am using build 5.8. Can someone please let me know if there is any
>deterministic way of expiring messages ? 
>
>My scenario:
>I have 2 consumers and I put an infinite sleep in one of the consumer. In
>that case the messages are never expired. Ideally fast consumer should
>consumer messages and slow consumer should expire those messages.
>
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664919.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.







--

Wangyin
SuoNayi2006@163.com 
 

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
I found an old post with same problem (non deterministic behaviour of
collecting expired messages)
https://issues.apache.org/jira/browse/AMQ-1112 

I am using build 5.8. Can someone please let me know if there is any
deterministic way of expiring messages ? 

My scenario:
I have 2 consumers and I put an infinite sleep in one of the consumer. In
that case the messages are never expired. Ideally fast consumer should
consumer messages and slow consumer should expire those messages.




--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664919.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
Thanks for the answer. I tried setting up expiry and timetolive and it does
expire the message. 

But If one my consumer takes a message and never acks - than messages are
expired for both fast and slow consumers. What I would like is atleast fast
consumer receive the message. But unfortunately that is not happening. Can
you please some suggest some solution for this.

Appreciate any help, pointers.

Thanks



--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664903.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by Gary Tully <ga...@gmail.com>.
note: the only way to discard messages from a queue is to consume them (as
in ack them) or have them expire by setting a message expiry time.
The setPendingMessageLimitStrategy and  setMessageEvictionStrategy policys
only apply to non durable topic subscriptions.


On 18 March 2013 23:39, jaikit <jk...@gmail.com> wrote:

> Currently I have 2 consumers set up - which are consuming from Queues. I
> have
> disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10,
> topicprefetch = 10.
>
> I have added infinite sleep in consumer1 and I ran the load test with
> 900000
> events. My expectation is consumer1 discarding all events and consumer2
> consuming all events. However both the consumers are now blocked. Also all
> the events are saved in both the queue. Each queue size is 900000.
>
> I have pasted my code snippet below. Can some one please guide me. Thanks.
>
>         /* configure activemq broker. */
>         broker.setDeleteAllMessagesOnStartup(true);
>         broker.setUseJmx(true);
>         broker.setAdvisorySupport(false);
>         PolicyMap policyMap = new PolicyMap();
>         List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>         PolicyEntry topicPolicy = new PolicyEntry();
>         topicPolicy.setTopic(">");
>         topicPolicy.setProducerFlowControl(false);
>         entries.add(topicPolicy);
>
>         PolicyEntry queuePolicy = new PolicyEntry();
>         /* if this is true and if consumers are slow - producers will be
> throttled and worst case halted */
>         queuePolicy.setProducerFlowControl(false);
>         /* set flow control for all topics */
>         queuePolicy.setQueue(">");
>
>         queuePolicy.setMemoryLimit(1);
>         ConstantPendingMessageLimitStrategy
> constantPendingMessageLimitStrategy = new
> ConstantPendingMessageLimitStrategy();
>         constantPendingMessageLimitStrategy.setLimit(1);
>
>
>
> queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
>         OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new
> OldestMessageEvictionStrategy();
>
> oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1);
>
> queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy);
>         /* send an advisory message if a consumer is deemed slow */
>         queuePolicy.setAdvisoryForSlowConsumers(false);
>         /* the period (in ms) of checks for message expiry on queued
> messages, value of 0 disables */
>         queuePolicy.setExpireMessagesPeriod(1000);
>         /* Set the PrefetchSize for all topics. You can override this value
> while creating consumer. */
>         // policy.setTopicPrefetch(10);
>         queuePolicy.setQueuePrefetch(10);
>         entries.add(queuePolicy);
>         policyMap.setPolicyEntries(entries);
>         broker.setDestinationPolicy(policyMap);
>
>         /* All undeliverable messages will be sent to ActiveMQ.DLQ which
> has
> fixed size. If it reaches fixed size,
>          * producers will be throttled. Drop dead letter queue. Enable it
> case by case basis. */
>         // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new
> DiscardingDLQBrokerPlugin();
>         // dlqBrokerPlugin.setDropAll(true);
>         // dlqBrokerPlugin.setDropTemporaryTopics(true);
>         // dlqBrokerPlugin.setDropTemporaryQueues(true);
>         // BrokerPlugin[] plugins = { dlqBrokerPlugin };
>         // broker.setPlugins(plugins);
>
>         VirtualTopic virtualTopic = new VirtualTopic();
>         // the new config that enables selectors on the intercepter
>         virtualTopic.setSelectorAware(true);
>         VirtualDestinationInterceptor interceptor = new
> VirtualDestinationInterceptor();
>         interceptor.setVirtualDestinations(new VirtualDestination[] {
> virtualTopic });
>         broker.setDestinationInterceptors(new DestinationInterceptor[] {
> interceptor });
>         broker.start();
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://redhat.com
http://blog.garytully.com

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
Currently I have 2 consumers set up - which are consuming from Queues. I have
disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10,
topicprefetch = 10. 

I have added infinite sleep in consumer1 and I ran the load test with 900000
events. My expectation is consumer1 discarding all events and consumer2
consuming all events. However both the consumers are now blocked. Also all
the events are saved in both the queue. Each queue size is 900000.

I have pasted my code snippet below. Can some one please guide me. Thanks.

        /* configure activemq broker. */
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setUseJmx(true);
        broker.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry topicPolicy = new PolicyEntry();
        topicPolicy.setTopic(">");
        topicPolicy.setProducerFlowControl(false);
        entries.add(topicPolicy);

        PolicyEntry queuePolicy = new PolicyEntry();
        /* if this is true and if consumers are slow - producers will be
throttled and worst case halted */
        queuePolicy.setProducerFlowControl(false);
        /* set flow control for all topics */
        queuePolicy.setQueue(">");

        queuePolicy.setMemoryLimit(1);
        ConstantPendingMessageLimitStrategy
constantPendingMessageLimitStrategy = new
ConstantPendingMessageLimitStrategy();
        constantPendingMessageLimitStrategy.setLimit(1);

       
queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
        OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new
OldestMessageEvictionStrategy();
       
oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1);
       
queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy);
        /* send an advisory message if a consumer is deemed slow */
        queuePolicy.setAdvisoryForSlowConsumers(false);
        /* the period (in ms) of checks for message expiry on queued
messages, value of 0 disables */
        queuePolicy.setExpireMessagesPeriod(1000);
        /* Set the PrefetchSize for all topics. You can override this value
while creating consumer. */
        // policy.setTopicPrefetch(10);
        queuePolicy.setQueuePrefetch(10);
        entries.add(queuePolicy);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);

        /* All undeliverable messages will be sent to ActiveMQ.DLQ which has
fixed size. If it reaches fixed size,
         * producers will be throttled. Drop dead letter queue. Enable it
case by case basis. */
        // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new
DiscardingDLQBrokerPlugin();
        // dlqBrokerPlugin.setDropAll(true);
        // dlqBrokerPlugin.setDropTemporaryTopics(true);
        // dlqBrokerPlugin.setDropTemporaryQueues(true);
        // BrokerPlugin[] plugins = { dlqBrokerPlugin };
        // broker.setPlugins(plugins);

        VirtualTopic virtualTopic = new VirtualTopic();
        // the new config that enables selectors on the intercepter
        virtualTopic.setSelectorAware(true);
        VirtualDestinationInterceptor interceptor = new
VirtualDestinationInterceptor();
        interceptor.setVirtualDestinations(new VirtualDestination[] {
virtualTopic });
        broker.setDestinationInterceptors(new DestinationInterceptor[] {
interceptor });
        broker.start();




--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
I was able to forward messages from Topics to Queue using VirtualDestination. 
But I am not able to figure out how do I configure my consumer to drop
messages if they are consuming slow.

I got below snippet from ActiveMQ documentation. 
Non-durable queues
A slow consumer is not really an issue with queues. But all of the consumers
being slow is. In this case we eventually block the producer until messages
are consumed.
Other options could be to
spool messages to disk
discard messages

How should I configure queues to discard messages ? Appreciate any help ! 

Thanks



--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664855.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Random slow Subscribers - causing Topic to full - Solution ?

Posted by Christian Posta <ch...@gmail.com>.
Yep. Confirmed. That's what it does.

On Sunday, March 17, 2013, jaikit wrote:

> If Virtual Topic -  "create queues for each topic subscriber and replicates
> messages to each one."  - than this is exactly what i need. Can you confirm
> if Virtual Topic replicates messages physically to queue for each topic
> subscriber ?
>
> Thanks for pointer.
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664808.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: Re:Random slow Subscribers - causing Topic to full - Solution ?

Posted by jaikit <jk...@gmail.com>.
If Virtual Topic -  "create queues for each topic subscriber and replicates
messages to each one."  - than this is exactly what i need. Can you confirm
if Virtual Topic replicates messages physically to queue for each topic
subscriber ?

Thanks for pointer.



--
View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664808.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:Random slow Subscribers - causing Topic to full - Solution ?

Posted by SuoNayi <su...@163.com>.
I think you have to consider the Virtual Topic,
http://activemq.apache.org/virtual-destinations.html








At 2013-03-16 13:04:41,jaikit <jk...@gmail.com> wrote:
>
>I currently have a Topic and there are multiple clients which are subscribed
>to my Topic. I am running with default prefetch for each client. Now the
>problem is if one of the client is slow, it slows down other subscribers. I
>am thinking to play by setting low prefetch for slow consumer. But this
>solution does not work because in my case slow clients are random and they
>keep changing over a period of time (based on the backend they are
>communicating). Hence I cannot keep low prefetch for one of the client. 
>
>Solution which I am thinking to prototype below solution: 
>I will be keeping queues for each subscriber and now Given a Topic - pool of
>threads will remove events from Topic and copy the events to my queue. Now
>since I have queue for each subscriber, each client is independent of each
>other. I will set prefetch limit to each queue. Once that limit is reached,
>I will drop the events. Drawback: Memory is required for each queue now.
>
>I would like some views about above solution or any other solution which you
>think might fit for my case.
>
>Thanks
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.