You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by David Sitsky <si...@nuix.com> on 2008/02/08 18:11:34 UTC

Optimising PrefetchSubscription.dispatchPending() ideas

In my application, I have noticed with 20 consumers, the broker's CPU is 
going through the roof, with many threads in 
PrefetchSubscription.dispatchPending().  With my consumers, it might be 
500-1000 messages dispatched before a commit() can be called.  With 20 
consumers, this means there can be a build-up of 20,000 uncommited 
messages lying around the system, let-alone the new messages which are 
being pumped into the system at a furious rate.  Not nice I know, but I 
don't have much choice about it at the moment, for application-specific 
reasons.

As you can imagine, I can have some very big pending queue sizes - 
sometimes 100,000 in size.

I am experimenting with different prefetch sizes which may help, but I 
suspect every time a prefetch thread is trying to dispatch a message, it 
might have to iterate through very large numbers of deleted messages or 
messages which have been claimed by other subscribers before it finds a 
matching message.  Multiply this by 20, and there is a lot of CPU being 
consumed.  This worries me for scalability reasons - if I want to keep 
bumping up the number of consumers.

I'm not sure what the best way of improving this is... is it possible 
when we call dispatchPending() to not call pendingMessageCursor.reset() 
perhaps?

I'm trying to understand why we need to reset the cursor, when 
presumably all off the messages we have gone over before in a previous 
dispatchPending() call are either deleted, dispatched or locked by 
another node, and therefore don't need to be checked again (or we check 
if we reach the end of the cursor list)?

I realise if a transaction is rolled back, that a message that was 
previously locked by another consumer may be freed.  There are probably 
message ordering isues too.

Is it possible when we are iterating through the cursor if we find a 
node locked by another consumer to perhaps move it to the end of the 
cursor (or another list) and check it only if we found no matches?

I'm sure there are a lot of complexities here I am not aware of - but I 
am curious what others think.

Doing this sort of chance should reduce the latencies and CPU usage of 
the broker significantly.

Cheers,
David



Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Jim Gomes <e....@gmail.com>.
That's great!  I would definitely like to see Topics improved, because my
testing shows that they are significantly slower than Queues.  Here are some
rough statistics that I have gathered so far.  I have more detailed data,
but this should give an idea of what I'm seeing.

*Scenario:*

   - Network bandwidth is 1 Gigabit LAN.
   - NMS OpenWire Producer on Machine A, sending 500,000 simple text
   messages using sendAsync=true.
   - ActiveMQ Broker on Machine B.
   - Four NMS OpenWire Consumers on Topic on Machine C.  One NMS OpenWire
   Consumer on Queue on Machine C.


*Results:*

   - The Queue message/second send rate is around 5,900.
   - The Topic message/second send rate is around 360.


This is a huge difference owing to the fact that ActiveMQ is slowing down
the Producer, even though it is sending asynchronously.  Sending to a topic
that has Consumers (none of them idle) is 16 times slower than sending to a
Queue that has a Consumer.

If I remove all Consumers on Machine C, then the Queue message/second send
rate jumps to 9,300, and the Topic message/second send rate goes to 7,700.
For whatever reasons, Topics are inherently slower than Queues.

I'll use this test scenario to measure any changes in performance.  If
anyone has suggestions on how to tune the broker, producers, or consumers to
get better speed, please let me know.

Regards,
Jim

On Wed, Feb 13, 2008 at 11:35 PM, Rob Davies <ra...@gmail.com> wrote:

>
> On Feb 14, 2008, at 3:08 AM, David Sitsky wrote:
>
> > Rob and I did some performance enhancements with queues so that a
> > Queue.send() call was decoupled from the dispatch processing.  In
> > the past, depending on the state of the consumers, a Queue.send()
> > call could take a significant amount of time.  We changed it so that
> > a single thread was responsible for dispatching messages, which
> > avoided a lot of lock contention.  It also meant a Queue.send()
> > returned as quickly as possible.
> >
> > I imagine a similar change could be done for Topics, since from what
> > I can tell, a Topic.send() call currently does its dispatch
> > processing in the same call.
> >
> > Cheers,
> > David
> >
> > Jim Gomes wrote:
> >> I am very interested in this set of changes.  I am currently
> >> load/performance testing ActiveMQ, and am very surprised at the
> >> results.
> >> Anything that can be done to speed this area is a good thing.  I
> >> have found
> >> a dramatic drop in performance when adding even a single consumer,
> >> especially to a Topic.  The producer to the Topic is slowed down
> >> quite a
> >> bit, which was a surprise to me.  I expected that the
> >> existence/non-existence or performance of a consumer would have no
> >> impact on
> >> a producer, but that is not the case.  A producer is directly
> >> impacted by
> >> any consumers, especially idle consumers.  An idle Topic consumer can
> >> actually cause a producer to block.  As far as I have been able to
> >> determine
> >> from browsing the documentation, this is by design.
> >> I am looking forward to your efforts in this area.
> >> Best Regards,
> >> Jim
> >
> > --
> > Cheers,
> > David
> >
> > Nuix Pty Ltd
> > Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280
> > 0699
> > Web: http://www.nuix.com                            Fax: +61 2 9212
> > 6902
>
>
> Yes - Topics next!
>
> cheers,
>
> Rob
>

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Rob Davies <ra...@gmail.com>.
On Feb 14, 2008, at 3:08 AM, David Sitsky wrote:

> Rob and I did some performance enhancements with queues so that a  
> Queue.send() call was decoupled from the dispatch processing.  In  
> the past, depending on the state of the consumers, a Queue.send()  
> call could take a significant amount of time.  We changed it so that  
> a single thread was responsible for dispatching messages, which  
> avoided a lot of lock contention.  It also meant a Queue.send()  
> returned as quickly as possible.
>
> I imagine a similar change could be done for Topics, since from what  
> I can tell, a Topic.send() call currently does its dispatch  
> processing in the same call.
>
> Cheers,
> David
>
> Jim Gomes wrote:
>> I am very interested in this set of changes.  I am currently
>> load/performance testing ActiveMQ, and am very surprised at the  
>> results.
>> Anything that can be done to speed this area is a good thing.  I  
>> have found
>> a dramatic drop in performance when adding even a single consumer,
>> especially to a Topic.  The producer to the Topic is slowed down  
>> quite a
>> bit, which was a surprise to me.  I expected that the
>> existence/non-existence or performance of a consumer would have no  
>> impact on
>> a producer, but that is not the case.  A producer is directly  
>> impacted by
>> any consumers, especially idle consumers.  An idle Topic consumer can
>> actually cause a producer to block.  As far as I have been able to  
>> determine
>> from browsing the documentation, this is by design.
>> I am looking forward to your efforts in this area.
>> Best Regards,
>> Jim
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Yes - Topics next!

cheers,

Rob

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by David Sitsky <si...@nuix.com>.
Hi Mike,

If you are talking about non-persistent messages for queues, then yes, 
the performance should be much better.  If you are talking about topics 
- that is another story.  My application was primarily queue-based, so I 
wasn't concerned with the performance of topics.

Cheers,
David

Mike Kelley wrote:
> Hi David,
> 
> I'm new to ActiveMQ and unfamiliar with the code (albeit w/experience in
> JMS), so forgive me if this is a silly question: do the performance
> issues/fixes discussed affect non-durable subscriptions (w/non-persistent
> messages) as well?
> 
> Mike
> 
> 
> David Sitsky-2 wrote:
>> Rob and I did some performance enhancements with queues so that a 
>> Queue.send() call was decoupled from the dispatch processing.  In the 
>> past, depending on the state of the consumers, a Queue.send() call could 
>> take a significant amount of time.  We changed it so that a single 
>> thread was responsible for dispatching messages, which avoided a lot of 
>> lock contention.  It also meant a Queue.send() returned as quickly as 
>> possible.
>>
>> I imagine a similar change could be done for Topics, since from what I 
>> can tell, a Topic.send() call currently does its dispatch processing in 
>> the same call.
>>
>> Cheers,
>> David
>>
> 


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Mike Kelley <mk...@rearden.com>.
Hi David,

I'm new to ActiveMQ and unfamiliar with the code (albeit w/experience in
JMS), so forgive me if this is a silly question: do the performance
issues/fixes discussed affect non-durable subscriptions (w/non-persistent
messages) as well?

Mike


David Sitsky-2 wrote:
> 
> Rob and I did some performance enhancements with queues so that a 
> Queue.send() call was decoupled from the dispatch processing.  In the 
> past, depending on the state of the consumers, a Queue.send() call could 
> take a significant amount of time.  We changed it so that a single 
> thread was responsible for dispatching messages, which avoided a lot of 
> lock contention.  It also meant a Queue.send() returned as quickly as 
> possible.
> 
> I imagine a similar change could be done for Topics, since from what I 
> can tell, a Topic.send() call currently does its dispatch processing in 
> the same call.
> 
> Cheers,
> David
> 

-- 
View this message in context: http://www.nabble.com/Optimising-PrefetchSubscription.dispatchPending%28%29-ideas-tp15359726s2354p16207231.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by David Sitsky <si...@nuix.com>.
Rob and I did some performance enhancements with queues so that a 
Queue.send() call was decoupled from the dispatch processing.  In the 
past, depending on the state of the consumers, a Queue.send() call could 
take a significant amount of time.  We changed it so that a single 
thread was responsible for dispatching messages, which avoided a lot of 
lock contention.  It also meant a Queue.send() returned as quickly as 
possible.

I imagine a similar change could be done for Topics, since from what I 
can tell, a Topic.send() call currently does its dispatch processing in 
the same call.

Cheers,
David

Jim Gomes wrote:
> I am very interested in this set of changes.  I am currently
> load/performance testing ActiveMQ, and am very surprised at the results.
> Anything that can be done to speed this area is a good thing.  I have found
> a dramatic drop in performance when adding even a single consumer,
> especially to a Topic.  The producer to the Topic is slowed down quite a
> bit, which was a surprise to me.  I expected that the
> existence/non-existence or performance of a consumer would have no impact on
> a producer, but that is not the case.  A producer is directly impacted by
> any consumers, especially idle consumers.  An idle Topic consumer can
> actually cause a producer to block.  As far as I have been able to determine
> from browsing the documentation, this is by design.
> 
> I am looking forward to your efforts in this area.
> Best Regards,
> Jim

-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Jim Gomes <e....@gmail.com>.
I am very interested in this set of changes.  I am currently
load/performance testing ActiveMQ, and am very surprised at the results.
Anything that can be done to speed this area is a good thing.  I have found
a dramatic drop in performance when adding even a single consumer,
especially to a Topic.  The producer to the Topic is slowed down quite a
bit, which was a surprise to me.  I expected that the
existence/non-existence or performance of a consumer would have no impact on
a producer, but that is not the case.  A producer is directly impacted by
any consumers, especially idle consumers.  An idle Topic consumer can
actually cause a producer to block.  As far as I have been able to determine
from browsing the documentation, this is by design.

I am looking forward to your efforts in this area.
Best Regards,
Jim

On Tue, Feb 12, 2008 at 10:39 PM, Rob Davies <ra...@gmail.com> wrote:

> Hi David,
>
> I think this is a valid patch. What I'm looking at at the moment is
> only adding messages to a QueueSubscriber's pending list which it can
> dispatch - rather than lots checking to see if they are able to
> dispatch it.
>
> cheers,
>
> Rob
> On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:
>
> > Hi Rob,
> >
> > I changed the condition for when to check the "trash" list to:
> >
> >               if (count > 0 || trash.size() > 1000)
> >
> > this gave much better performance for a range of application data.
> > I've re-attached the patch again to avoid confusion.  As I said
> > before - the broker consumes far less CPU now than before, so I am
> > able to add a lot more consumers now.
> >
> > Any thoughts?  Are there better ways of implementing this?
> >
> > Cheers,
> > David
>

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by David Sitsky <si...@nuix.com>.
Hi Rob,

That sounds like another good optimisation.  I guess we probably still 
need both changes, since if a large number of messages are being 
injected into the system, we will still end up with a large number of 
messages on the pending lists for all subscribers.

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> I think this is a valid patch. What I'm looking at at the moment is only 
> adding messages to a QueueSubscriber's pending list which it can 
> dispatch - rather than lots checking to see if they are able to dispatch 
> it.
> 
> cheers,
> 
> Rob
> On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I changed the condition for when to check the "trash" list to:
>>
>>               if (count > 0 || trash.size() > 1000)
>>
>> this gave much better performance for a range of application data.  
>> I've re-attached the patch again to avoid confusion.  As I said before 
>> - the broker consumes far less CPU now than before, so I am able to 
>> add a lot more consumers now.
>>
>> Any thoughts?  Are there better ways of implementing this?
>>
>> Cheers,
>> David
>>
>> David Sitsky wrote:
>>> Hi Rob,
>>> I was using a version that did have you most recent changes.
>>> To give you a better idea of what I meant, I hacked up some changes 
>>> which you can see from the attached patch.
>>> The idea is instead of going through the pending list performing the 
>>> same computations over and over again on messages which have been 
>>> already been handled by other subscriptions, to move them to another 
>>> list.
>>> For a particular run, this reduced my application run-time from 47 
>>> minutes to 38 minutes.
>>> I'm sure there are better ways of implementing this - but do you see 
>>> what I mean?
>>> Cheers,
>>> David
>>> Rob Davies wrote:
>>>> David,
>>>>
>>>> which release are you working on ? There was a change last night in 
>>>> Queue's that might affect the cpu usage.
>>>> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
>>>>
>>>>> In my application, I have noticed with 20 consumers, the broker's 
>>>>> CPU is going through the roof, with many threads in 
>>>>> PrefetchSubscription.dispatchPending().  With my consumers, it 
>>>>> might be 500-1000 messages dispatched before a commit() can be 
>>>>> called.  With 20 consumers, this means there can be a build-up of 
>>>>> 20,000 uncommited messages lying around the system, let-alone the 
>>>>> new messages which are being pumped into the system at a furious 
>>>>> rate.  Not nice I know, but I don't have much choice about it at 
>>>>> the moment, for application-specific reasons.
>>>>>
>>>>> As you can imagine, I can have some very big pending queue sizes - 
>>>>> sometimes 100,000 in size.
>>>>>
>>>>> I am experimenting with different prefetch sizes which may help, 
>>>>> but I suspect every time a prefetch thread is trying to dispatch a 
>>>>> message, it might have to iterate through very large numbers of 
>>>>> deleted messages or messages which have been claimed by other 
>>>>> subscribers before it finds a matching message.  Multiply this by 
>>>>> 20, and there is a lot of CPU being consumed.  This worries me for 
>>>>> scalability reasons - if I want to keep bumping up the number of 
>>>>> consumers.
>>>>>
>>>>> I'm not sure what the best way of improving this is... is it 
>>>>> possible when we call dispatchPending() to not call 
>>>>> pendingMessageCursor.reset() perhaps?
>>>> reset() is a nop for the QueueStoreCursor :(
>>>>>
>>>>>
>>>>> I'm trying to understand why we need to reset the cursor, when 
>>>>> presumably all off the messages we have gone over before in a 
>>>>> previous dispatchPending() call are either deleted, dispatched or 
>>>>> locked by another node, and therefore don't need to be checked 
>>>>> again (or we check if we reach the end of the cursor list)?
>>>> I
>>>>>
>>>>>
>>>>> I realise if a transaction is rolled back, that a message that was 
>>>>> previously locked by another consumer may be freed.  There are 
>>>>> probably message ordering isues too.
>>>>>
>>>>> Is it possible when we are iterating through the cursor if we find 
>>>>> a node locked by another consumer to perhaps move it to the end of 
>>>>> the cursor (or another list) and check it only if we found no matches?
>>>>>
>>>>> I'm sure there are a lot of complexities here I am not aware of - 
>>>>> but I am curious what others think.
>>>>>
>>>>> Doing this sort of chance should reduce the latencies and CPU usage 
>>>>> of the broker significantly.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>>
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>> (revision 619666)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>> (working copy)
>> @@ -19,6 +19,7 @@
>> import java.io.IOException;
>> import java.util.ArrayList;
>> import java.util.Iterator;
>> +import java.util.LinkedList;
>> import java.util.List;
>> import java.util.concurrent.CopyOnWriteArrayList;
>>
>> @@ -54,6 +55,7 @@
>>
>>     private static final Log LOG = 
>> LogFactory.getLog(PrefetchSubscription.class);
>>     protected PendingMessageCursor pending;
>> +    protected List<MessageReference> trash = new 
>> LinkedList<MessageReference>();
>>     protected final List<MessageReference> dispatched = new 
>> CopyOnWriteArrayList<MessageReference>();
>>     protected int prefetchExtension;
>>     protected long enqueueCounter;
>> @@ -439,22 +449,41 @@
>>
>>     protected void dispatchPending() throws IOException {
>>         if (!isSlave()) {
>> +           int count = 0;
>>            synchronized(pendingLock) {
>>                 try {
>>                     int numberToDispatch = countBeforeFull();
>>                     if (numberToDispatch > 0) {
>>                         pending.setMaxBatchSize(numberToDispatch);
>> -                        int count = 0;
>>                         pending.reset();
>>                         while (pending.hasNext() && !isFull()
>>                                 && count < numberToDispatch) {
>>                             MessageReference node = pending.next();
>> +                            LockOwner lockOwner;
>>                             if (node == null) {
>>                                 break;
>>                             }
>>                             if(isDropped(node)) {
>>                                 pending.remove();
>>                             }
>> +                            else if (node instanceof 
>> QueueMessageReference &&
>> +                                     
>> (((QueueMessageReference)node).isAcked()))
>> +                            {
>> +                                // Message has been acked.  Move it 
>> to the trash, since it
>> +                                // is unlikely to be dispatched to 
>> this subscription.
>> +                                pending.remove();
>> +                                trash.add(node);
>> +                            }
>> +                            else if (node instanceof 
>> IndirectMessageReference &&
>> +                                     (lockOwner = 
>> ((IndirectMessageReference)node).getLockOwner()) != null &&
>> +                                     lockOwner != this)
>> +                            {
>> +                                // Message which has been locked by 
>> another subscription.
>> +                                // Move it to the trash, since it is 
>> unlikely to be
>> +                                // dispatched to this subscription.
>> +                                pending.remove();
>> +                                trash.add(node);
>> +                            }
>>                             else if (canDispatch(node)) {
>>                                 pending.remove();
>>                                 // Message may have been sitting in 
>> the pending
>> @@ -475,7 +504,40 @@
>>                 } finally {
>>                     pending.release();
>>                 }
>> -            }
>> +
>> +               // Check if any trash can be cleaned up or some 
>> messages need to be placed
>> +               // back into the pending list.
>> +               if (count > 0 || trash.size() > 1000)
>> +               {
>> +                   for (Iterator<MessageReference> iter = 
>> trash.iterator(); iter.hasNext();)
>> +                   {
>> +                       MessageReference node = iter.next();
>> +                       if (isDropped(node))
>> +                       {
>> +                           // Message has been deleted, so it can be 
>> removed.
>> +                           iter.remove();
>> +                       }
>> +                       else if ((node instanceof 
>> QueueMessageReference &&
>> +                               !((QueueMessageReference) 
>> node).isAcked()) ||
>> +                                (node instanceof 
>> IndirectMessageReference &&
>> +                               ((IndirectMessageReference) 
>> node).getLockOwner() == null))
>> +                       {
>> +                           // Message is no longer acked or it is not 
>> locked by anyone
>> +                           // probably due to a rolledback 
>> transaction.  Re-inject it into
>> +                           // the pending list again.  This shouldn't 
>> be very common.
>> +                           try
>> +                           {
>> +                               pending.addMessageLast(node);
>> +                               iter.remove();
>> +                           }
>> +                           catch (Exception e)
>> +                           {
>> +                               throw new IOException("Unable to add 
>> message to pending list", e);
>> +                           }
>> +                       }
>> +                   }
>> +               }
>> +           }
>>         }
>>     }
>>


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Rob Davies <ra...@gmail.com>.
Hi David,

I think this is a valid patch. What I'm looking at at the moment is  
only adding messages to a QueueSubscriber's pending list which it can  
dispatch - rather than lots checking to see if they are able to  
dispatch it.

cheers,

Rob
On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:

> Hi Rob,
>
> I changed the condition for when to check the "trash" list to:
>
>               if (count > 0 || trash.size() > 1000)
>
> this gave much better performance for a range of application data.   
> I've re-attached the patch again to avoid confusion.  As I said  
> before - the broker consumes far less CPU now than before, so I am  
> able to add a lot more consumers now.
>
> Any thoughts?  Are there better ways of implementing this?
>
> Cheers,
> David
>
> David Sitsky wrote:
>> Hi Rob,
>> I was using a version that did have you most recent changes.
>> To give you a better idea of what I meant, I hacked up some changes  
>> which you can see from the attached patch.
>> The idea is instead of going through the pending list performing  
>> the same computations over and over again on messages which have  
>> been already been handled by other subscriptions, to move them to  
>> another list.
>> For a particular run, this reduced my application run-time from 47  
>> minutes to 38 minutes.
>> I'm sure there are better ways of implementing this - but do you  
>> see what I mean?
>> Cheers,
>> David
>> Rob Davies wrote:
>>> David,
>>>
>>> which release are you working on ? There was a change last night  
>>> in Queue's that might affect the cpu usage.
>>> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
>>>
>>>> In my application, I have noticed with 20 consumers, the broker's  
>>>> CPU is going through the roof, with many threads in  
>>>> PrefetchSubscription.dispatchPending().  With my consumers, it  
>>>> might be 500-1000 messages dispatched before a commit() can be  
>>>> called.  With 20 consumers, this means there can be a build-up of  
>>>> 20,000 uncommited messages lying around the system, let-alone the  
>>>> new messages which are being pumped into the system at a furious  
>>>> rate.  Not nice I know, but I don't have much choice about it at  
>>>> the moment, for application-specific reasons.
>>>>
>>>> As you can imagine, I can have some very big pending queue sizes  
>>>> - sometimes 100,000 in size.
>>>>
>>>> I am experimenting with different prefetch sizes which may help,  
>>>> but I suspect every time a prefetch thread is trying to dispatch  
>>>> a message, it might have to iterate through very large numbers of  
>>>> deleted messages or messages which have been claimed by other  
>>>> subscribers before it finds a matching message.  Multiply this by  
>>>> 20, and there is a lot of CPU being consumed.  This worries me  
>>>> for scalability reasons - if I want to keep bumping up the number  
>>>> of consumers.
>>>>
>>>> I'm not sure what the best way of improving this is... is it  
>>>> possible when we call dispatchPending() to not call  
>>>> pendingMessageCursor.reset() perhaps?
>>> reset() is a nop for the QueueStoreCursor :(
>>>>
>>>>
>>>> I'm trying to understand why we need to reset the cursor, when  
>>>> presumably all off the messages we have gone over before in a  
>>>> previous dispatchPending() call are either deleted, dispatched or  
>>>> locked by another node, and therefore don't need to be checked  
>>>> again (or we check if we reach the end of the cursor list)?
>>> I
>>>>
>>>>
>>>> I realise if a transaction is rolled back, that a message that  
>>>> was previously locked by another consumer may be freed.  There  
>>>> are probably message ordering isues too.
>>>>
>>>> Is it possible when we are iterating through the cursor if we  
>>>> find a node locked by another consumer to perhaps move it to the  
>>>> end of the cursor (or another list) and check it only if we found  
>>>> no matches?
>>>>
>>>> I'm sure there are a lot of complexities here I am not aware of -  
>>>> but I am curious what others think.
>>>>
>>>> Doing this sort of chance should reduce the latencies and CPU  
>>>> usage of the broker significantly.
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>>
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(revision 619666)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(working copy)
> @@ -19,6 +19,7 @@
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.Iterator;
> +import java.util.LinkedList;
> import java.util.List;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> @@ -54,6 +55,7 @@
>
>     private static final Log LOG =  
> LogFactory.getLog(PrefetchSubscription.class);
>     protected PendingMessageCursor pending;
> +    protected List<MessageReference> trash = new  
> LinkedList<MessageReference>();
>     protected final List<MessageReference> dispatched = new  
> CopyOnWriteArrayList<MessageReference>();
>     protected int prefetchExtension;
>     protected long enqueueCounter;
> @@ -439,22 +449,41 @@
>
>     protected void dispatchPending() throws IOException {
>         if (!isSlave()) {
> +           int count = 0;
>            synchronized(pendingLock) {
>                 try {
>                     int numberToDispatch = countBeforeFull();
>                     if (numberToDispatch > 0) {
>                         pending.setMaxBatchSize(numberToDispatch);
> -                        int count = 0;
>                         pending.reset();
>                         while (pending.hasNext() && !isFull()
>                                 && count < numberToDispatch) {
>                             MessageReference node = pending.next();
> +                            LockOwner lockOwner;
>                             if (node == null) {
>                                 break;
>                             }
>                             if(isDropped(node)) {
>                                 pending.remove();
>                             }
> +                            else if (node instanceof  
> QueueMessageReference &&
> +                                      
> (((QueueMessageReference)node).isAcked()))
> +                            {
> +                                // Message has been acked.  Move it  
> to the trash, since it
> +                                // is unlikely to be dispatched to  
> this subscription.
> +                                pending.remove();
> +                                trash.add(node);
> +                            }
> +                            else if (node instanceof  
> IndirectMessageReference &&
> +                                     (lockOwner =  
> ((IndirectMessageReference)node).getLockOwner()) != null &&
> +                                     lockOwner != this)
> +                            {
> +                                // Message which has been locked by  
> another subscription.
> +                                // Move it to the trash, since it  
> is unlikely to be
> +                                // dispatched to this subscription.
> +                                pending.remove();
> +                                trash.add(node);
> +                            }
>                             else if (canDispatch(node)) {
>                                 pending.remove();
>                                 // Message may have been sitting in  
> the pending
> @@ -475,7 +504,40 @@
>                 } finally {
>                     pending.release();
>                 }
> -            }
> +
> +               // Check if any trash can be cleaned up or some  
> messages need to be placed
> +               // back into the pending list.
> +               if (count > 0 || trash.size() > 1000)
> +               {
> +                   for (Iterator<MessageReference> iter =  
> trash.iterator(); iter.hasNext();)
> +                   {
> +                       MessageReference node = iter.next();
> +                       if (isDropped(node))
> +                       {
> +                           // Message has been deleted, so it can  
> be removed.
> +                           iter.remove();
> +                       }
> +                       else if ((node instanceof  
> QueueMessageReference &&
> +                               !((QueueMessageReference)  
> node).isAcked()) ||
> +                                (node instanceof  
> IndirectMessageReference &&
> +                               ((IndirectMessageReference)  
> node).getLockOwner() == null))
> +                       {
> +                           // Message is no longer acked or it is  
> not locked by anyone
> +                           // probably due to a rolledback  
> transaction.  Re-inject it into
> +                           // the pending list again.  This  
> shouldn't be very common.
> +                           try
> +                           {
> +                               pending.addMessageLast(node);
> +                               iter.remove();
> +                           }
> +                           catch (Exception e)
> +                           {
> +                               throw new IOException("Unable to add  
> message to pending list", e);
> +                           }
> +                       }
> +                   }
> +               }
> +           }
>         }
>     }
>


Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by David Sitsky <si...@nuix.com>.
Hi Rob,

I changed the condition for when to check the "trash" list to:

                if (count > 0 || trash.size() > 1000)

this gave much better performance for a range of application data.  I've 
re-attached the patch again to avoid confusion.  As I said before - the 
broker consumes far less CPU now than before, so I am able to add a lot 
more consumers now.

Any thoughts?  Are there better ways of implementing this?

Cheers,
David

David Sitsky wrote:
> Hi Rob,
> 
> I was using a version that did have you most recent changes.
> 
> To give you a better idea of what I meant, I hacked up some changes 
> which you can see from the attached patch.
> 
> The idea is instead of going through the pending list performing the 
> same computations over and over again on messages which have been 
> already been handled by other subscriptions, to move them to another list.
> 
> For a particular run, this reduced my application run-time from 47 
> minutes to 38 minutes.
> 
> I'm sure there are better ways of implementing this - but do you see 
> what I mean?
> 
> Cheers,
> David
> 
> Rob Davies wrote:
>> David,
>>
>> which release are you working on ? There was a change last night in 
>> Queue's that might affect the cpu usage.
>> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
>>
>>> In my application, I have noticed with 20 consumers, the broker's CPU 
>>> is going through the roof, with many threads in 
>>> PrefetchSubscription.dispatchPending().  With my consumers, it might 
>>> be 500-1000 messages dispatched before a commit() can be called.  
>>> With 20 consumers, this means there can be a build-up of 20,000 
>>> uncommited messages lying around the system, let-alone the new 
>>> messages which are being pumped into the system at a furious rate.  
>>> Not nice I know, but I don't have much choice about it at the moment, 
>>> for application-specific reasons.
>>>
>>> As you can imagine, I can have some very big pending queue sizes - 
>>> sometimes 100,000 in size.
>>>
>>> I am experimenting with different prefetch sizes which may help, but 
>>> I suspect every time a prefetch thread is trying to dispatch a 
>>> message, it might have to iterate through very large numbers of 
>>> deleted messages or messages which have been claimed by other 
>>> subscribers before it finds a matching message.  Multiply this by 20, 
>>> and there is a lot of CPU being consumed.  This worries me for 
>>> scalability reasons - if I want to keep bumping up the number of 
>>> consumers.
>>>
>>> I'm not sure what the best way of improving this is... is it possible 
>>> when we call dispatchPending() to not call 
>>> pendingMessageCursor.reset() perhaps?
>> reset() is a nop for the QueueStoreCursor :(
>>>
>>>
>>> I'm trying to understand why we need to reset the cursor, when 
>>> presumably all off the messages we have gone over before in a 
>>> previous dispatchPending() call are either deleted, dispatched or 
>>> locked by another node, and therefore don't need to be checked again 
>>> (or we check if we reach the end of the cursor list)?
>> I
>>>
>>>
>>> I realise if a transaction is rolled back, that a message that was 
>>> previously locked by another consumer may be freed.  There are 
>>> probably message ordering isues too.
>>>
>>> Is it possible when we are iterating through the cursor if we find a 
>>> node locked by another consumer to perhaps move it to the end of the 
>>> cursor (or another list) and check it only if we found no matches?
>>>
>>> I'm sure there are a lot of complexities here I am not aware of - but 
>>> I am curious what others think.
>>>
>>> Doing this sort of chance should reduce the latencies and CPU usage 
>>> of the broker significantly.
>>>
>>> Cheers,
>>> David
>>>
>>>
> 
> 


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by David Sitsky <si...@nuix.com>.
Hi Rob,

I was using a version that did have you most recent changes.

To give you a better idea of what I meant, I hacked up some changes 
which you can see from the attached patch.

The idea is instead of going through the pending list performing the 
same computations over and over again on messages which have been 
already been handled by other subscriptions, to move them to another list.

For a particular run, this reduced my application run-time from 47 
minutes to 38 minutes.

I'm sure there are better ways of implementing this - but do you see 
what I mean?

Cheers,
David

Rob Davies wrote:
> David,
> 
> which release are you working on ? There was a change last night in 
> Queue's that might affect the cpu usage.
> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
> 
>> In my application, I have noticed with 20 consumers, the broker's CPU 
>> is going through the roof, with many threads in 
>> PrefetchSubscription.dispatchPending().  With my consumers, it might 
>> be 500-1000 messages dispatched before a commit() can be called.  With 
>> 20 consumers, this means there can be a build-up of 20,000 uncommited 
>> messages lying around the system, let-alone the new messages which are 
>> being pumped into the system at a furious rate.  Not nice I know, but 
>> I don't have much choice about it at the moment, for 
>> application-specific reasons.
>>
>> As you can imagine, I can have some very big pending queue sizes - 
>> sometimes 100,000 in size.
>>
>> I am experimenting with different prefetch sizes which may help, but I 
>> suspect every time a prefetch thread is trying to dispatch a message, 
>> it might have to iterate through very large numbers of deleted 
>> messages or messages which have been claimed by other subscribers 
>> before it finds a matching message.  Multiply this by 20, and there is 
>> a lot of CPU being consumed.  This worries me for scalability reasons 
>> - if I want to keep bumping up the number of consumers.
>>
>> I'm not sure what the best way of improving this is... is it possible 
>> when we call dispatchPending() to not call 
>> pendingMessageCursor.reset() perhaps?
> reset() is a nop for the QueueStoreCursor :(
>>
>>
>> I'm trying to understand why we need to reset the cursor, when 
>> presumably all off the messages we have gone over before in a previous 
>> dispatchPending() call are either deleted, dispatched or locked by 
>> another node, and therefore don't need to be checked again (or we 
>> check if we reach the end of the cursor list)?
> I
>>
>>
>> I realise if a transaction is rolled back, that a message that was 
>> previously locked by another consumer may be freed.  There are 
>> probably message ordering isues too.
>>
>> Is it possible when we are iterating through the cursor if we find a 
>> node locked by another consumer to perhaps move it to the end of the 
>> cursor (or another list) and check it only if we found no matches?
>>
>> I'm sure there are a lot of complexities here I am not aware of - but 
>> I am curious what others think.
>>
>> Doing this sort of chance should reduce the latencies and CPU usage of 
>> the broker significantly.
>>
>> Cheers,
>> David
>>
>>


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Optimising PrefetchSubscription.dispatchPending() ideas

Posted by Rob Davies <ra...@gmail.com>.
David,

which release are you working on ? There was a change last night in  
Queue's that might affect the cpu usage.
On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:

> In my application, I have noticed with 20 consumers, the broker's  
> CPU is going through the roof, with many threads in  
> PrefetchSubscription.dispatchPending().  With my consumers, it might  
> be 500-1000 messages dispatched before a commit() can be called.   
> With 20 consumers, this means there can be a build-up of 20,000  
> uncommited messages lying around the system, let-alone the new  
> messages which are being pumped into the system at a furious rate.   
> Not nice I know, but I don't have much choice about it at the  
> moment, for application-specific reasons.
>
> As you can imagine, I can have some very big pending queue sizes -  
> sometimes 100,000 in size.
>
> I am experimenting with different prefetch sizes which may help, but  
> I suspect every time a prefetch thread is trying to dispatch a  
> message, it might have to iterate through very large numbers of  
> deleted messages or messages which have been claimed by other  
> subscribers before it finds a matching message.  Multiply this by  
> 20, and there is a lot of CPU being consumed.  This worries me for  
> scalability reasons - if I want to keep bumping up the number of  
> consumers.
>
> I'm not sure what the best way of improving this is... is it  
> possible when we call dispatchPending() to not call  
> pendingMessageCursor.reset() perhaps?
reset() is a nop for the QueueStoreCursor :(
>
>
> I'm trying to understand why we need to reset the cursor, when  
> presumably all off the messages we have gone over before in a  
> previous dispatchPending() call are either deleted, dispatched or  
> locked by another node, and therefore don't need to be checked again  
> (or we check if we reach the end of the cursor list)?
I
>
>
> I realise if a transaction is rolled back, that a message that was  
> previously locked by another consumer may be freed.  There are  
> probably message ordering isues too.
>
> Is it possible when we are iterating through the cursor if we find a  
> node locked by another consumer to perhaps move it to the end of the  
> cursor (or another list) and check it only if we found no matches?
>
> I'm sure there are a lot of complexities here I am not aware of -  
> but I am curious what others think.
>
> Doing this sort of chance should reduce the latencies and CPU usage  
> of the broker significantly.
>
> Cheers,
> David
>
>