You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by David Sitsky <si...@nuix.com> on 2008/03/06 06:54:58 UTC

Re: Queue performance from recent changes

Hi Rob,

I know its been a couple of weeks.  I've been using my changes for a 
while and I see nice CPU and memory usage on the broker, and good 
messaging performance for my application.  Have you had a chance to try 
it out?

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> thanks for the great feedback - will try your patch and see how it works!
> 
> cheers,
> 
> Rob
> On 20 Feb 2008, at 06:31, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I like the new changes, but with the changes as they are, for my 
>> application for one of my benchmarks, it takes twice as long to complete.
>>
>> I believe the culprit for this is that when the new code can't find a 
>> consumer which is not full, the broker chooses the consumer with the 
>> lowest dispatch queue size.
>>
>> In my application, since I have a prefetch size of 1, and have 
>> longish-running transactions, the dispatch queue size is not 
>> indicative of the current load for that consumer.  As a result, I 
>> think this is what is responsible for poor load-balancing in my case.
>>
>> For applications which commit() after each processed message, I am 
>> sure this wouldn't be the case.  In some ways, reverting to the old 
>> behaviour of adding the pending message to all consumers might lead to 
>> better load balancing with this code.
>>
>> However - I think it is better if the consumers can decide when they 
>> want more messages rather than the broker pushing messages at them? 
>> I've attached a patch which demonstrates this.  When LAZY_DISPATCH is 
>> set to true (set via a system property for now for testing purposes) 
>> this changes the behaviour slightly.
>>
>> The basic idea is pageInMessages() only pages in the minimum number of 
>> messages that can be dispatched immediately to non-full consumers. 
>> Whenever a consumer acks a message, which updates its prefetch size, 
>> we make sure Queue.wakeup() is called so that the consumer will 
>> receive new messages.
>>
>> With this change in effect - I see slightly faster or almost the same 
>> times with the previous benchmark.  However memory usage on the broker 
>> is far better, as the pending queues for each consumer is either 0 or 
>> very small.
>>
>> What do you think?  I guess there are better ways of doing this.
>>
>> I am doing a large overnight run with 16 consumers, so we'll see how 
>> the  performance goes.
>>
>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>> thought there didn't seem to be any need for adding a message to a new 
>> consumer if the message has already been locked by another consumer?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> please let us know if these changes helps/hinders your app!
>>> cheers,
>>> Rob
>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>> If what I said above is true, then the immediately above if 
>>>>>> statement needs to be moved outside its enclosing if - otherwise 
>>>>>> it only gets executed when targets != null.  We'd want this to 
>>>>>> execute if we found a matching target wouldn't we?
>>>>> Don't think so? We only want the message going to  one 
>>>>> subscription? I may have misunderstood what you mean!
>>>> Yes - ignore what I said, I had my wires crossed.
>>>>
>>>> Cheers,
>>>> David
>>>>
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>> (revision 628917)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>> (working copy)
>> @@ -160,6 +160,8 @@
>>     public  void acknowledge(final ConnectionContext context,final 
>> MessageAck ack) throws Exception {
>>         // Handle the standard acknowledgment case.
>>         boolean callDispatchMatched = false;
>> +    Queue queue = null;
>> +   
>>         synchronized(dispatchLock) {
>>             if (ack.isStandardAck()) {
>>                 // Acknowledge all dispatched messages up till the 
>> message id of
>> @@ -223,8 +225,12 @@
>>                                 prefetchExtension = Math.max(0,
>>                                         prefetchExtension - (index + 1));
>>                             }
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>> -                            break;
>> +                break;
>>                         }
>>                     }
>>                 }
>> @@ -253,6 +259,10 @@
>>                     if 
>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>                         prefetchExtension = Math.max(prefetchExtension,
>>                                 index + 1);
>> +                        if (queue == null)
>> +                        {
>> +                            queue = (Queue)node.getRegionDestination();
>> +                        }
>>                         callDispatchMatched = true;
>>                         break;
>>                     }
>> @@ -279,6 +289,10 @@
>>                     if (inAckRange) {
>>                         node.incrementRedeliveryCounter();
>>                         if (ack.getLastMessageId().equals(messageId)) {
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>>                             break;
>>                         }
>> @@ -320,6 +334,10 @@
>>                         if (ack.getLastMessageId().equals(messageId)) {
>>                             prefetchExtension = Math.max(0, 
>> prefetchExtension
>>                                     - (index + 1));
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>>                             break;
>>                         }
>> @@ -336,6 +354,9 @@
>>             }
>>         }
>>         if (callDispatchMatched) {
>> +        if (Queue.LAZY_DISPATCH) {
>> +        queue.wakeup();
>> +        }
>>             dispatchPending();
>>         } else {
>>             if (isSlave()) {
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>> (revision 628917)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>> (working copy)
>> @@ -75,6 +75,8 @@
>>  * @version $Revision: 1.28 $
>>  */
>> public class Queue extends BaseDestination implements Task {
>> +    public static final boolean LAZY_DISPATCH =
>> +    Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>> "true"));
>>     private final Log log;
>>     private final List<Subscription> consumers = new 
>> ArrayList<Subscription>(50);
>>     private PendingMessageCursor messages;
>> @@ -212,12 +214,12 @@
>>             synchronized (pagedInMessages) {
>>                 // Add all the matching messages in the queue to the
>>                 // subscription.
>> -
>>                 for (Iterator<MessageReference> i = 
>> pagedInMessages.values()
>>                         .iterator(); i.hasNext();) {
>>                     QueueMessageReference node = 
>> (QueueMessageReference) i
>>                             .next();
>> -                    if (!node.isDropped() && !node.isAcked() && 
>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>> +                    if ((!node.isDropped() || 
>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>> +            node.getLockOwner() == null) {
>>                         msgContext.setMessageReference(node);
>>                         if (sub.matches(node, msgContext)) {
>>                             sub.add(node);
>> @@ -940,7 +945,11 @@
>>         dispatchLock.lock();
>>         try{
>>
>> -            final int toPageIn = getMaxPageSize() - 
>> pagedInMessages.size();
>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>> +        if (LAZY_DISPATCH) {
>> +        // Only page in the minimum number of messages which can be 
>> dispatched immediately.
>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>> toPageIn);
>> +        }
>>             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>                 messages.setMaxBatchSize(toPageIn);
>>                 int count = 0;
>> @@ -976,12 +985,25 @@
>>         }
>>         return result;
>>     }
>> +
>> +    private int getConsumerMessageCountBeforeFull() throws Exception {
>> +    int total = 0;
>> +        synchronized (consumers) {
>> +            for (Subscription s : consumers) {
>> +        if (s instanceof PrefetchSubscription) {
>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>> +        }
>> +        }
>> +    }
>> +    return total;
>> +    }
>>
>>     private void doDispatch(List<MessageReference> list) throws 
>> Exception {
>>
>>         if (list != null) {
>>             synchronized (consumers) {
>>                 for (MessageReference node : list) {
>> +
>>                     Subscription target = null;
>>                     List<Subscription> targets = null;
>>                     for (Subscription s : consumers) {


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by David Sitsky <si...@nuix.com>.
James Strachan wrote:
> On 06/03/2008, David Sitsky <si...@nuix.com> wrote:
>> I am sure it will be application-dependent, so making it a policy makes
>>  a lot of sense.  For my application, I only have a pending size of 1
>>  since each work item's processing requirements can vary tremendously.
> 
> I wonder could the same code be smart enough to work in the 2
> different modes based on the prefetch size?
> 
> i.e. use the default if the consumers's prefetch size is > 100 or
> something or use David's approach if its smaller
> 
> If not then using destination policies sounds fine to me; just
> wondered if we could be smart enough to use the right policy based on
> the consumer configuration?

I think it is very much application-dependent - and it is based on more 
than just the prefetch size.

In my situation, I may have 500,000 messages in my system that need to 
be delivered, but I don't want them to be delivered to pending queues 
unnecessarily, since it may be some time before the consumers have a 
chance to eat them up.  I also need a large queue page size since I 
can't do a commit() after each message received.  So I also have a lot 
of uncommitted messages floating about the system - maybe 24,000 at a 
given time.

I really need the requirement for only putting a message to a consumer's 
pending queue when it can process it.  Otherwise I found the pending 
queues for each consumer would grow to be extremely large, consuming 
unnecessary CPU and memory resources.  With my changes, the broker's 
usage was kept nice and small.

A lot of this may only occur for applications that have a large queue 
page size.

It feels right for this to be a policy option... I know how complex 
different messaging application performance requirements can be!

Cheers,
David








-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by James Strachan <ja...@gmail.com>.
On 06/03/2008, David Sitsky <si...@nuix.com> wrote:
> I am sure it will be application-dependent, so making it a policy makes
>  a lot of sense.  For my application, I only have a pending size of 1
>  since each work item's processing requirements can vary tremendously.

I wonder could the same code be smart enough to work in the 2
different modes based on the prefetch size?

i.e. use the default if the consumers's prefetch size is > 100 or
something or use David's approach if its smaller

If not then using destination policies sounds fine to me; just
wondered if we could be smart enough to use the right policy based on
the consumer configuration?

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Re: Queue performance from recent changes

Posted by Rob Davies <ra...@gmail.com>.
Hi Dave,

there will still be some contention there - but as this has been  
greatly reduced since the beginning of the year - I am hoping that it  
will be more than countered by not having to go through the extra  
thread for dispatching? Be interested in seeing if its better or worse  
for your case.

cheers,

Rob
On 10 Mar 2008, at 06:07, David Sitsky wrote:

> Hi Rob,
>
> I'll give it a go in the next day or two when I get some spare time.  
> From what I can see, we call iterate() inline rather than delegating  
> it to the dedicated "wakeup" thread when optimizeDispatch is set.
>
> From memory, this caused contention with synchronisation blocks last  
> time - as many consumers would end up calling iterate().  Would this  
> still not be an issue?  I must admit I'd need to check the code  
> closely.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> David,
>> you might like to try enabling the optimizeDispatch property on the  
>> Destination policy map - see http://activemq.apache.org/configure-version-5-brokers.html 
>>  from trunk, if you are using non-persistent messages
>> cheers,
>> Rob
>> On 6 Mar 2008, at 22:48, David Sitsky wrote:
>>> I am sure it will be application-dependent, so making it a policy  
>>> makes a lot of sense.  For my application, I only have a pending  
>>> size of 1 since each work item's processing requirements can vary  
>>> tremendously.
>>>
>>> Just curious - what kind of benchmarks did you run this against?   
>>> I'm curious to know what kind of performance degregation you saw..  
>>> it would be interesting to understand why.  I am using non- 
>>> persistent messaging, so perhaps that could make a difference,  
>>> since I am only paging a small number of messages in at a time.
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> Yes - actually - I tried it a few days ago. I haven't committed  
>>>> it because message throughput is generally lower. I will look at  
>>>> making it optional via a destination policy
>>>> cheers,
>>>> Rob
>>>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I know its been a couple of weeks.  I've been using my changes  
>>>>> for a while and I see nice CPU and memory usage on the broker,  
>>>>> and good messaging performance for my application.  Have you had  
>>>>> a chance to try it out?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> thanks for the great feedback - will try your patch and see how  
>>>>>> it works!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>>>> Hi Rob,
>>>>>>>
>>>>>>> I like the new changes, but with the changes as they are, for  
>>>>>>> my application for one of my benchmarks, it takes twice as  
>>>>>>> long to complete.
>>>>>>>
>>>>>>> I believe the culprit for this is that when the new code can't  
>>>>>>> find a consumer which is not full, the broker chooses the  
>>>>>>> consumer with the lowest dispatch queue size.
>>>>>>>
>>>>>>> In my application, since I have a prefetch size of 1, and have  
>>>>>>> longish-running transactions, the dispatch queue size is not  
>>>>>>> indicative of the current load for that consumer.  As a  
>>>>>>> result, I think this is what is responsible for poor load- 
>>>>>>> balancing in my case.
>>>>>>>
>>>>>>> For applications which commit() after each processed message,  
>>>>>>> I am sure this wouldn't be the case.  In some ways, reverting  
>>>>>>> to the old behaviour of adding the pending message to all  
>>>>>>> consumers might lead to better load balancing with this code.
>>>>>>>
>>>>>>> However - I think it is better if the consumers can decide  
>>>>>>> when they want more messages rather than the broker pushing  
>>>>>>> messages at them? I've attached a patch which demonstrates  
>>>>>>> this.  When LAZY_DISPATCH is set to true (set via a system  
>>>>>>> property for now for testing purposes) this changes the  
>>>>>>> behaviour slightly.
>>>>>>>
>>>>>>> The basic idea is pageInMessages() only pages in the minimum  
>>>>>>> number of messages that can be dispatched immediately to non- 
>>>>>>> full consumers. Whenever a consumer acks a message, which  
>>>>>>> updates its prefetch size, we make sure Queue.wakeup() is  
>>>>>>> called so that the consumer will receive new messages.
>>>>>>>
>>>>>>> With this change in effect - I see slightly faster or almost  
>>>>>>> the same times with the previous benchmark.  However memory  
>>>>>>> usage on the broker is far better, as the pending queues for  
>>>>>>> each consumer is either 0 or very small.
>>>>>>>
>>>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>>>
>>>>>>> I am doing a large overnight run with 16 consumers, so we'll  
>>>>>>> see how the  performance goes.
>>>>>>>
>>>>>>> You'll also notice in the patch, that in  
>>>>>>> Queue.addSubscriber(), I thought there didn't seem to be any  
>>>>>>> need for adding a message to a new consumer if the message has  
>>>>>>> already been locked by another consumer?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>>> Rob Davies wrote:
>>>>>>>> Hi David,
>>>>>>>> please let us know if these changes helps/hinders your app!
>>>>>>>> cheers,
>>>>>>>> Rob
>>>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>>>> If what I said above is true, then the immediately above  
>>>>>>>>>>> if statement needs to be moved outside its enclosing if -  
>>>>>>>>>>> otherwise it only gets executed when targets != null.   
>>>>>>>>>>> We'd want this to execute if we found a matching target  
>>>>>>>>>>> wouldn't we?
>>>>>>>>>> Don't think so? We only want the message going to  one  
>>>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> David
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -- 
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>>> Nuix Pty Ltd
>>>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>>>>>> 9280 0699
>>>>>>> Web: http://www.nuix.com                            Fax: +61 2  
>>>>>>> 9212 6902
>>>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/PrefetchSubscription.java
>>>>>>> = 
>>>>>>> = 
>>>>>>> = 
>>>>>>> ================================================================
>>>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/PrefetchSubscription.java    (revision 628917)
>>>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/PrefetchSubscription.java    (working copy)
>>>>>>> @@ -160,6 +160,8 @@
>>>>>>>  public  void acknowledge(final ConnectionContext  
>>>>>>> context,final MessageAck ack) throws Exception {
>>>>>>>      // Handle the standard acknowledgment case.
>>>>>>>      boolean callDispatchMatched = false;
>>>>>>> +    Queue queue = null;
>>>>>>> +           synchronized(dispatchLock) {
>>>>>>>          if (ack.isStandardAck()) {
>>>>>>>              // Acknowledge all dispatched messages up till  
>>>>>>> the message id of
>>>>>>> @@ -223,8 +225,12 @@
>>>>>>>                              prefetchExtension = Math.max(0,
>>>>>>>                                      prefetchExtension -  
>>>>>>> (index + 1));
>>>>>>>                          }
>>>>>>> +                if (queue == null)
>>>>>>> +                {
>>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>>> +                }
>>>>>>>                          callDispatchMatched = true;
>>>>>>> -                            break;
>>>>>>> +                break;
>>>>>>>                      }
>>>>>>>                  }
>>>>>>>              }
>>>>>>> @@ -253,6 +259,10 @@
>>>>>>>                  if  
>>>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>>>                      prefetchExtension =  
>>>>>>> Math.max(prefetchExtension,
>>>>>>>                              index + 1);
>>>>>>> +                        if (queue == null)
>>>>>>> +                        {
>>>>>>> +                            queue =  
>>>>>>> (Queue)node.getRegionDestination();
>>>>>>> +                        }
>>>>>>>                      callDispatchMatched = true;
>>>>>>>                      break;
>>>>>>>                  }
>>>>>>> @@ -279,6 +289,10 @@
>>>>>>>                  if (inAckRange) {
>>>>>>>                      node.incrementRedeliveryCounter();
>>>>>>>                      if  
>>>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>>> +                if (queue == null)
>>>>>>> +                {
>>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>>> +                }
>>>>>>>                          callDispatchMatched = true;
>>>>>>>                          break;
>>>>>>>                      }
>>>>>>> @@ -320,6 +334,10 @@
>>>>>>>                      if  
>>>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>>>                          prefetchExtension = Math.max(0,  
>>>>>>> prefetchExtension
>>>>>>>                                  - (index + 1));
>>>>>>> +                if (queue == null)
>>>>>>> +                {
>>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>>> +                }
>>>>>>>                          callDispatchMatched = true;
>>>>>>>                          break;
>>>>>>>                      }
>>>>>>> @@ -336,6 +354,9 @@
>>>>>>>          }
>>>>>>>      }
>>>>>>>      if (callDispatchMatched) {
>>>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>>>> +        queue.wakeup();
>>>>>>> +        }
>>>>>>>          dispatchPending();
>>>>>>>      } else {
>>>>>>>          if (isSlave()) {
>>>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/Queue.java
>>>>>>> = 
>>>>>>> = 
>>>>>>> = 
>>>>>>> ================================================================
>>>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/Queue.java    (revision 628917)
>>>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>>>> region/Queue.java    (working copy)
>>>>>>> @@ -75,6 +75,8 @@
>>>>>>> * @version $Revision: 1.28 $
>>>>>>> */
>>>>>>> public class Queue extends BaseDestination implements Task {
>>>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>>>> +     
>>>>>>> Boolean 
>>>>>>> .parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>>>>>> "true"));
>>>>>>>  private final Log log;
>>>>>>>  private final List<Subscription> consumers = new  
>>>>>>> ArrayList<Subscription>(50);
>>>>>>>  private PendingMessageCursor messages;
>>>>>>> @@ -212,12 +214,12 @@
>>>>>>>          synchronized (pagedInMessages) {
>>>>>>>              // Add all the matching messages in the queue to  
>>>>>>> the
>>>>>>>              // subscription.
>>>>>>> -
>>>>>>>              for (Iterator<MessageReference> i =  
>>>>>>> pagedInMessages.values()
>>>>>>>                      .iterator(); i.hasNext();) {
>>>>>>>                  QueueMessageReference node =  
>>>>>>> (QueueMessageReference) i
>>>>>>>                          .next();
>>>>>>> -                    if (!node.isDropped() && !node.isAcked()  
>>>>>>> && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>>>> +                    if ((!node.isDropped() ||  
>>>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>>>> +            node.getLockOwner() == null) {
>>>>>>>                      msgContext.setMessageReference(node);
>>>>>>>                      if (sub.matches(node, msgContext)) {
>>>>>>>                          sub.add(node);
>>>>>>> @@ -940,7 +945,11 @@
>>>>>>>      dispatchLock.lock();
>>>>>>>      try{
>>>>>>>
>>>>>>> -            final int toPageIn = getMaxPageSize() -  
>>>>>>> pagedInMessages.size();
>>>>>>> +            int toPageIn = getMaxPageSize() -  
>>>>>>> pagedInMessages.size();
>>>>>>> +        if (LAZY_DISPATCH) {
>>>>>>> +        // Only page in the minimum number of messages which  
>>>>>>> can be dispatched immediately.
>>>>>>> +        toPageIn =  
>>>>>>> Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
>>>>>>> +        }
>>>>>>>          if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>>>              messages.setMaxBatchSize(toPageIn);
>>>>>>>              int count = 0;
>>>>>>> @@ -976,12 +985,25 @@
>>>>>>>      }
>>>>>>>      return result;
>>>>>>>  }
>>>>>>> +
>>>>>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>>>>>> Exception {
>>>>>>> +    int total = 0;
>>>>>>> +        synchronized (consumers) {
>>>>>>> +            for (Subscription s : consumers) {
>>>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>>>> +            total +=  
>>>>>>> ((PrefetchSubscription)s).countBeforeFull();
>>>>>>> +        }
>>>>>>> +        }
>>>>>>> +    }
>>>>>>> +    return total;
>>>>>>> +    }
>>>>>>>
>>>>>>>  private void doDispatch(List<MessageReference> list) throws  
>>>>>>> Exception {
>>>>>>>
>>>>>>>      if (list != null) {
>>>>>>>          synchronized (consumers) {
>>>>>>>              for (MessageReference node : list) {
>>>>>>> +
>>>>>>>                  Subscription target = null;
>>>>>>>                  List<Subscription> targets = null;
>>>>>>>                  for (Subscription s : consumers) {
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>>>> 9280 0699
>>>>> Web: http://www.nuix.com                            Fax: +61 2  
>>>>> 9212 6902
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Re: Queue performance from recent changes

Posted by David Sitsky <si...@nuix.com>.
Hi Rob,

I'll give it a go in the next day or two when I get some spare time. 
 From what I can see, we call iterate() inline rather than delegating it 
to the dedicated "wakeup" thread when optimizeDispatch is set.

 From memory, this caused contention with synchronisation blocks last 
time - as many consumers would end up calling iterate().  Would this 
still not be an issue?  I must admit I'd need to check the code closely.

Cheers,
David

Rob Davies wrote:
> David,
> 
> you might like to try enabling the optimizeDispatch property on the 
> Destination policy map - see 
> http://activemq.apache.org/configure-version-5-brokers.html from trunk, 
> if you are using non-persistent messages
> 
> cheers,
> 
> Rob
> On 6 Mar 2008, at 22:48, David Sitsky wrote:
> 
>> I am sure it will be application-dependent, so making it a policy 
>> makes a lot of sense.  For my application, I only have a pending size 
>> of 1 since each work item's processing requirements can vary 
>> tremendously.
>>
>> Just curious - what kind of benchmarks did you run this against?  I'm 
>> curious to know what kind of performance degregation you saw.. it 
>> would be interesting to understand why.  I am using non-persistent 
>> messaging, so perhaps that could make a difference, since I am only 
>> paging a small number of messages in at a time.
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> Yes - actually - I tried it a few days ago. I haven't committed it 
>>> because message throughput is generally lower. I will look at making 
>>> it optional via a destination policy
>>> cheers,
>>> Rob
>>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I know its been a couple of weeks.  I've been using my changes for a 
>>>> while and I see nice CPU and memory usage on the broker, and good 
>>>> messaging performance for my application.  Have you had a chance to 
>>>> try it out?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> thanks for the great feedback - will try your patch and see how it 
>>>>> works!
>>>>> cheers,
>>>>> Rob
>>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>>> Hi Rob,
>>>>>>
>>>>>> I like the new changes, but with the changes as they are, for my 
>>>>>> application for one of my benchmarks, it takes twice as long to 
>>>>>> complete.
>>>>>>
>>>>>> I believe the culprit for this is that when the new code can't 
>>>>>> find a consumer which is not full, the broker chooses the consumer 
>>>>>> with the lowest dispatch queue size.
>>>>>>
>>>>>> In my application, since I have a prefetch size of 1, and have 
>>>>>> longish-running transactions, the dispatch queue size is not 
>>>>>> indicative of the current load for that consumer.  As a result, I 
>>>>>> think this is what is responsible for poor load-balancing in my case.
>>>>>>
>>>>>> For applications which commit() after each processed message, I am 
>>>>>> sure this wouldn't be the case.  In some ways, reverting to the 
>>>>>> old behaviour of adding the pending message to all consumers might 
>>>>>> lead to better load balancing with this code.
>>>>>>
>>>>>> However - I think it is better if the consumers can decide when 
>>>>>> they want more messages rather than the broker pushing messages at 
>>>>>> them? I've attached a patch which demonstrates this.  When 
>>>>>> LAZY_DISPATCH is set to true (set via a system property for now 
>>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>>
>>>>>> The basic idea is pageInMessages() only pages in the minimum 
>>>>>> number of messages that can be dispatched immediately to non-full 
>>>>>> consumers. Whenever a consumer acks a message, which updates its 
>>>>>> prefetch size, we make sure Queue.wakeup() is called so that the 
>>>>>> consumer will receive new messages.
>>>>>>
>>>>>> With this change in effect - I see slightly faster or almost the 
>>>>>> same times with the previous benchmark.  However memory usage on 
>>>>>> the broker is far better, as the pending queues for each consumer 
>>>>>> is either 0 or very small.
>>>>>>
>>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>>
>>>>>> I am doing a large overnight run with 16 consumers, so we'll see 
>>>>>> how the  performance goes.
>>>>>>
>>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>>>>>> thought there didn't seem to be any need for adding a message to a 
>>>>>> new consumer if the message has already been locked by another 
>>>>>> consumer?
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Rob Davies wrote:
>>>>>>> Hi David,
>>>>>>> please let us know if these changes helps/hinders your app!
>>>>>>> cheers,
>>>>>>> Rob
>>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>>> If what I said above is true, then the immediately above if 
>>>>>>>>>> statement needs to be moved outside its enclosing if - 
>>>>>>>>>> otherwise it only gets executed when targets != null.  We'd 
>>>>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>>>>> Don't think so? We only want the message going to  one 
>>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> David
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Nuix Pty Ltd
>>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 
>>>>>> 9280 0699
>>>>>> Web: http://www.nuix.com                            Fax: +61 2 
>>>>>> 9212 6902
>>>>>> Index: 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>>>>>
>>>>>> ===================================================================
>>>>>> --- 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>>>> (revision 628917)
>>>>>> +++ 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>>>> (working copy)
>>>>>> @@ -160,6 +160,8 @@
>>>>>>   public  void acknowledge(final ConnectionContext context,final 
>>>>>> MessageAck ack) throws Exception {
>>>>>>       // Handle the standard acknowledgment case.
>>>>>>       boolean callDispatchMatched = false;
>>>>>> +    Queue queue = null;
>>>>>> +           synchronized(dispatchLock) {
>>>>>>           if (ack.isStandardAck()) {
>>>>>>               // Acknowledge all dispatched messages up till the 
>>>>>> message id of
>>>>>> @@ -223,8 +225,12 @@
>>>>>>                               prefetchExtension = Math.max(0,
>>>>>>                                       prefetchExtension - (index + 
>>>>>> 1));
>>>>>>                           }
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>> -                            break;
>>>>>> +                break;
>>>>>>                       }
>>>>>>                   }
>>>>>>               }
>>>>>> @@ -253,6 +259,10 @@
>>>>>>                   if 
>>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>>                       prefetchExtension = Math.max(prefetchExtension,
>>>>>>                               index + 1);
>>>>>> +                        if (queue == null)
>>>>>> +                        {
>>>>>> +                            queue = 
>>>>>> (Queue)node.getRegionDestination();
>>>>>> +                        }
>>>>>>                       callDispatchMatched = true;
>>>>>>                       break;
>>>>>>                   }
>>>>>> @@ -279,6 +289,10 @@
>>>>>>                   if (inAckRange) {
>>>>>>                       node.incrementRedeliveryCounter();
>>>>>>                       if (ack.getLastMessageId().equals(messageId)) {
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>>                           break;
>>>>>>                       }
>>>>>> @@ -320,6 +334,10 @@
>>>>>>                       if (ack.getLastMessageId().equals(messageId)) {
>>>>>>                           prefetchExtension = Math.max(0, 
>>>>>> prefetchExtension
>>>>>>                                   - (index + 1));
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>>                           break;
>>>>>>                       }
>>>>>> @@ -336,6 +354,9 @@
>>>>>>           }
>>>>>>       }
>>>>>>       if (callDispatchMatched) {
>>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>>> +        queue.wakeup();
>>>>>> +        }
>>>>>>           dispatchPending();
>>>>>>       } else {
>>>>>>           if (isSlave()) {
>>>>>> Index: 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java 
>>>>>>
>>>>>> ===================================================================
>>>>>> --- 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>>>> (revision 628917)
>>>>>> +++ 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>>>> (working copy)
>>>>>> @@ -75,6 +75,8 @@
>>>>>> * @version $Revision: 1.28 $
>>>>>> */
>>>>>> public class Queue extends BaseDestination implements Task {
>>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>>> +    
>>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>>>>>> "true"));
>>>>>>   private final Log log;
>>>>>>   private final List<Subscription> consumers = new 
>>>>>> ArrayList<Subscription>(50);
>>>>>>   private PendingMessageCursor messages;
>>>>>> @@ -212,12 +214,12 @@
>>>>>>           synchronized (pagedInMessages) {
>>>>>>               // Add all the matching messages in the queue to the
>>>>>>               // subscription.
>>>>>> -
>>>>>>               for (Iterator<MessageReference> i = 
>>>>>> pagedInMessages.values()
>>>>>>                       .iterator(); i.hasNext();) {
>>>>>>                   QueueMessageReference node = 
>>>>>> (QueueMessageReference) i
>>>>>>                           .next();
>>>>>> -                    if (!node.isDropped() && !node.isAcked() && 
>>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>>> +                    if ((!node.isDropped() || 
>>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>>> +            node.getLockOwner() == null) {
>>>>>>                       msgContext.setMessageReference(node);
>>>>>>                       if (sub.matches(node, msgContext)) {
>>>>>>                           sub.add(node);
>>>>>> @@ -940,7 +945,11 @@
>>>>>>       dispatchLock.lock();
>>>>>>       try{
>>>>>>
>>>>>> -            final int toPageIn = getMaxPageSize() - 
>>>>>> pagedInMessages.size();
>>>>>> +            int toPageIn = getMaxPageSize() - 
>>>>>> pagedInMessages.size();
>>>>>> +        if (LAZY_DISPATCH) {
>>>>>> +        // Only page in the minimum number of messages which can 
>>>>>> be dispatched immediately.
>>>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>>>>>> toPageIn);
>>>>>> +        }
>>>>>>           if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>>               messages.setMaxBatchSize(toPageIn);
>>>>>>               int count = 0;
>>>>>> @@ -976,12 +985,25 @@
>>>>>>       }
>>>>>>       return result;
>>>>>>   }
>>>>>> +
>>>>>> +    private int getConsumerMessageCountBeforeFull() throws 
>>>>>> Exception {
>>>>>> +    int total = 0;
>>>>>> +        synchronized (consumers) {
>>>>>> +            for (Subscription s : consumers) {
>>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>>>> +        }
>>>>>> +        }
>>>>>> +    }
>>>>>> +    return total;
>>>>>> +    }
>>>>>>
>>>>>>   private void doDispatch(List<MessageReference> list) throws 
>>>>>> Exception {
>>>>>>
>>>>>>       if (list != null) {
>>>>>>           synchronized (consumers) {
>>>>>>               for (MessageReference node : list) {
>>>>>> +
>>>>>>                   Subscription target = null;
>>>>>>                   List<Subscription> targets = null;
>>>>>>                   for (Subscription s : consumers) {
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>> 0699
>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>> 6902
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
> 
> 


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by Rob Davies <ra...@gmail.com>.
David,

you might like to try enabling the optimizeDispatch property on the  
Destination policy map - see http://activemq.apache.org/configure-version-5-brokers.html 
  from trunk, if you are using non-persistent messages

cheers,

Rob
On 6 Mar 2008, at 22:48, David Sitsky wrote:

> I am sure it will be application-dependent, so making it a policy  
> makes a lot of sense.  For my application, I only have a pending  
> size of 1 since each work item's processing requirements can vary  
> tremendously.
>
> Just curious - what kind of benchmarks did you run this against?   
> I'm curious to know what kind of performance degregation you saw..  
> it would be interesting to understand why.  I am using non- 
> persistent messaging, so perhaps that could make a difference, since  
> I am only paging a small number of messages in at a time.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> Yes - actually - I tried it a few days ago. I haven't committed it  
>> because message throughput is generally lower. I will look at  
>> making it optional via a destination policy
>> cheers,
>> Rob
>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I know its been a couple of weeks.  I've been using my changes for  
>>> a while and I see nice CPU and memory usage on the broker, and  
>>> good messaging performance for my application.  Have you had a  
>>> chance to try it out?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> thanks for the great feedback - will try your patch and see how  
>>>> it works!
>>>> cheers,
>>>> Rob
>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I like the new changes, but with the changes as they are, for my  
>>>>> application for one of my benchmarks, it takes twice as long to  
>>>>> complete.
>>>>>
>>>>> I believe the culprit for this is that when the new code can't  
>>>>> find a consumer which is not full, the broker chooses the  
>>>>> consumer with the lowest dispatch queue size.
>>>>>
>>>>> In my application, since I have a prefetch size of 1, and have  
>>>>> longish-running transactions, the dispatch queue size is not  
>>>>> indicative of the current load for that consumer.  As a result,  
>>>>> I think this is what is responsible for poor load-balancing in  
>>>>> my case.
>>>>>
>>>>> For applications which commit() after each processed message, I  
>>>>> am sure this wouldn't be the case.  In some ways, reverting to  
>>>>> the old behaviour of adding the pending message to all consumers  
>>>>> might lead to better load balancing with this code.
>>>>>
>>>>> However - I think it is better if the consumers can decide when  
>>>>> they want more messages rather than the broker pushing messages  
>>>>> at them? I've attached a patch which demonstrates this.  When  
>>>>> LAZY_DISPATCH is set to true (set via a system property for now  
>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>
>>>>> The basic idea is pageInMessages() only pages in the minimum  
>>>>> number of messages that can be dispatched immediately to non- 
>>>>> full consumers. Whenever a consumer acks a message, which  
>>>>> updates its prefetch size, we make sure Queue.wakeup() is called  
>>>>> so that the consumer will receive new messages.
>>>>>
>>>>> With this change in effect - I see slightly faster or almost the  
>>>>> same times with the previous benchmark.  However memory usage on  
>>>>> the broker is far better, as the pending queues for each  
>>>>> consumer is either 0 or very small.
>>>>>
>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>
>>>>> I am doing a large overnight run with 16 consumers, so we'll see  
>>>>> how the  performance goes.
>>>>>
>>>>> You'll also notice in the patch, that in Queue.addSubscriber(),  
>>>>> I thought there didn't seem to be any need for adding a message  
>>>>> to a new consumer if the message has already been locked by  
>>>>> another consumer?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> please let us know if these changes helps/hinders your app!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>> If what I said above is true, then the immediately above if  
>>>>>>>>> statement needs to be moved outside its enclosing if -  
>>>>>>>>> otherwise it only gets executed when targets != null.  We'd  
>>>>>>>>> want this to execute if we found a matching target wouldn't  
>>>>>>>>> we?
>>>>>>>> Don't think so? We only want the message going to  one  
>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>>>> 9280 0699
>>>>> Web: http://www.nuix.com                            Fax: +61 2  
>>>>> 9212 6902
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java
>>>>> = 
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java    (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java    (working copy)
>>>>> @@ -160,6 +160,8 @@
>>>>>   public  void acknowledge(final ConnectionContext context,final  
>>>>> MessageAck ack) throws Exception {
>>>>>       // Handle the standard acknowledgment case.
>>>>>       boolean callDispatchMatched = false;
>>>>> +    Queue queue = null;
>>>>> +           synchronized(dispatchLock) {
>>>>>           if (ack.isStandardAck()) {
>>>>>               // Acknowledge all dispatched messages up till the  
>>>>> message id of
>>>>> @@ -223,8 +225,12 @@
>>>>>                               prefetchExtension = Math.max(0,
>>>>>                                       prefetchExtension - (index  
>>>>> + 1));
>>>>>                           }
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>> -                            break;
>>>>> +                break;
>>>>>                       }
>>>>>                   }
>>>>>               }
>>>>> @@ -253,6 +259,10 @@
>>>>>                   if  
>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>                       prefetchExtension =  
>>>>> Math.max(prefetchExtension,
>>>>>                               index + 1);
>>>>> +                        if (queue == null)
>>>>> +                        {
>>>>> +                            queue =  
>>>>> (Queue)node.getRegionDestination();
>>>>> +                        }
>>>>>                       callDispatchMatched = true;
>>>>>                       break;
>>>>>                   }
>>>>> @@ -279,6 +289,10 @@
>>>>>                   if (inAckRange) {
>>>>>                       node.incrementRedeliveryCounter();
>>>>>                       if  
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>>                           break;
>>>>>                       }
>>>>> @@ -320,6 +334,10 @@
>>>>>                       if  
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>                           prefetchExtension = Math.max(0,  
>>>>> prefetchExtension
>>>>>                                   - (index + 1));
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>>                           break;
>>>>>                       }
>>>>> @@ -336,6 +354,9 @@
>>>>>           }
>>>>>       }
>>>>>       if (callDispatchMatched) {
>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>> +        queue.wakeup();
>>>>> +        }
>>>>>           dispatchPending();
>>>>>       } else {
>>>>>           if (isSlave()) {
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java
>>>>> = 
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java    (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java    (working copy)
>>>>> @@ -75,6 +75,8 @@
>>>>> * @version $Revision: 1.28 $
>>>>> */
>>>>> public class Queue extends BaseDestination implements Task {
>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>> +     
>>>>> Boolean 
>>>>> .parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>>>> "true"));
>>>>>   private final Log log;
>>>>>   private final List<Subscription> consumers = new  
>>>>> ArrayList<Subscription>(50);
>>>>>   private PendingMessageCursor messages;
>>>>> @@ -212,12 +214,12 @@
>>>>>           synchronized (pagedInMessages) {
>>>>>               // Add all the matching messages in the queue to the
>>>>>               // subscription.
>>>>> -
>>>>>               for (Iterator<MessageReference> i =  
>>>>> pagedInMessages.values()
>>>>>                       .iterator(); i.hasNext();) {
>>>>>                   QueueMessageReference node =  
>>>>> (QueueMessageReference) i
>>>>>                           .next();
>>>>> -                    if (!node.isDropped() && !node.isAcked() &&  
>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>> +                    if ((!node.isDropped() ||  
>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>> +            node.getLockOwner() == null) {
>>>>>                       msgContext.setMessageReference(node);
>>>>>                       if (sub.matches(node, msgContext)) {
>>>>>                           sub.add(node);
>>>>> @@ -940,7 +945,11 @@
>>>>>       dispatchLock.lock();
>>>>>       try{
>>>>>
>>>>> -            final int toPageIn = getMaxPageSize() -  
>>>>> pagedInMessages.size();
>>>>> +            int toPageIn = getMaxPageSize() -  
>>>>> pagedInMessages.size();
>>>>> +        if (LAZY_DISPATCH) {
>>>>> +        // Only page in the minimum number of messages which  
>>>>> can be dispatched immediately.
>>>>> +        toPageIn =  
>>>>> Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
>>>>> +        }
>>>>>           if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>               messages.setMaxBatchSize(toPageIn);
>>>>>               int count = 0;
>>>>> @@ -976,12 +985,25 @@
>>>>>       }
>>>>>       return result;
>>>>>   }
>>>>> +
>>>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>>>> Exception {
>>>>> +    int total = 0;
>>>>> +        synchronized (consumers) {
>>>>> +            for (Subscription s : consumers) {
>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>>> +        }
>>>>> +        }
>>>>> +    }
>>>>> +    return total;
>>>>> +    }
>>>>>
>>>>>   private void doDispatch(List<MessageReference> list) throws  
>>>>> Exception {
>>>>>
>>>>>       if (list != null) {
>>>>>           synchronized (consumers) {
>>>>>               for (MessageReference node : list) {
>>>>> +
>>>>>                   Subscription target = null;
>>>>>                   List<Subscription> targets = null;
>>>>>                   for (Subscription s : consumers) {
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Re: Queue performance from recent changes

Posted by David Sitsky <si...@nuix.com>.
I am sure it will be application-dependent, so making it a policy makes 
a lot of sense.  For my application, I only have a pending size of 1 
since each work item's processing requirements can vary tremendously.

Just curious - what kind of benchmarks did you run this against?  I'm 
curious to know what kind of performance degregation you saw.. it would 
be interesting to understand why.  I am using non-persistent messaging, 
so perhaps that could make a difference, since I am only paging a small 
number of messages in at a time.

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> Yes - actually - I tried it a few days ago. I haven't committed it 
> because message throughput is generally lower. I will look at making it 
> optional via a destination policy
> 
> cheers,
> 
> Rob
> On 6 Mar 2008, at 05:54, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I know its been a couple of weeks.  I've been using my changes for a 
>> while and I see nice CPU and memory usage on the broker, and good 
>> messaging performance for my application.  Have you had a chance to 
>> try it out?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> thanks for the great feedback - will try your patch and see how it 
>>> works!
>>> cheers,
>>> Rob
>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I like the new changes, but with the changes as they are, for my 
>>>> application for one of my benchmarks, it takes twice as long to 
>>>> complete.
>>>>
>>>> I believe the culprit for this is that when the new code can't find 
>>>> a consumer which is not full, the broker chooses the consumer with 
>>>> the lowest dispatch queue size.
>>>>
>>>> In my application, since I have a prefetch size of 1, and have 
>>>> longish-running transactions, the dispatch queue size is not 
>>>> indicative of the current load for that consumer.  As a result, I 
>>>> think this is what is responsible for poor load-balancing in my case.
>>>>
>>>> For applications which commit() after each processed message, I am 
>>>> sure this wouldn't be the case.  In some ways, reverting to the old 
>>>> behaviour of adding the pending message to all consumers might lead 
>>>> to better load balancing with this code.
>>>>
>>>> However - I think it is better if the consumers can decide when they 
>>>> want more messages rather than the broker pushing messages at them? 
>>>> I've attached a patch which demonstrates this.  When LAZY_DISPATCH 
>>>> is set to true (set via a system property for now for testing 
>>>> purposes) this changes the behaviour slightly.
>>>>
>>>> The basic idea is pageInMessages() only pages in the minimum number 
>>>> of messages that can be dispatched immediately to non-full 
>>>> consumers. Whenever a consumer acks a message, which updates its 
>>>> prefetch size, we make sure Queue.wakeup() is called so that the 
>>>> consumer will receive new messages.
>>>>
>>>> With this change in effect - I see slightly faster or almost the 
>>>> same times with the previous benchmark.  However memory usage on the 
>>>> broker is far better, as the pending queues for each consumer is 
>>>> either 0 or very small.
>>>>
>>>> What do you think?  I guess there are better ways of doing this.
>>>>
>>>> I am doing a large overnight run with 16 consumers, so we'll see how 
>>>> the  performance goes.
>>>>
>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>>>> thought there didn't seem to be any need for adding a message to a 
>>>> new consumer if the message has already been locked by another 
>>>> consumer?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> please let us know if these changes helps/hinders your app!
>>>>> cheers,
>>>>> Rob
>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>> If what I said above is true, then the immediately above if 
>>>>>>>> statement needs to be moved outside its enclosing if - otherwise 
>>>>>>>> it only gets executed when targets != null.  We'd want this to 
>>>>>>>> execute if we found a matching target wouldn't we?
>>>>>>> Don't think so? We only want the message going to  one 
>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>> 0699
>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>> 6902
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>> (working copy)
>>>> @@ -160,6 +160,8 @@
>>>>    public  void acknowledge(final ConnectionContext context,final 
>>>> MessageAck ack) throws Exception {
>>>>        // Handle the standard acknowledgment case.
>>>>        boolean callDispatchMatched = false;
>>>> +    Queue queue = null;
>>>> +           synchronized(dispatchLock) {
>>>>            if (ack.isStandardAck()) {
>>>>                // Acknowledge all dispatched messages up till the 
>>>> message id of
>>>> @@ -223,8 +225,12 @@
>>>>                                prefetchExtension = Math.max(0,
>>>>                                        prefetchExtension - (index + 
>>>> 1));
>>>>                            }
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>> -                            break;
>>>> +                break;
>>>>                        }
>>>>                    }
>>>>                }
>>>> @@ -253,6 +259,10 @@
>>>>                    if 
>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>                        prefetchExtension = Math.max(prefetchExtension,
>>>>                                index + 1);
>>>> +                        if (queue == null)
>>>> +                        {
>>>> +                            queue = 
>>>> (Queue)node.getRegionDestination();
>>>> +                        }
>>>>                        callDispatchMatched = true;
>>>>                        break;
>>>>                    }
>>>> @@ -279,6 +289,10 @@
>>>>                    if (inAckRange) {
>>>>                        node.incrementRedeliveryCounter();
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -320,6 +334,10 @@
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>>                            prefetchExtension = Math.max(0, 
>>>> prefetchExtension
>>>>                                    - (index + 1));
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -336,6 +354,9 @@
>>>>            }
>>>>        }
>>>>        if (callDispatchMatched) {
>>>> +        if (Queue.LAZY_DISPATCH) {
>>>> +        queue.wakeup();
>>>> +        }
>>>>            dispatchPending();
>>>>        } else {
>>>>            if (isSlave()) {
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java 
>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>> (working copy)
>>>> @@ -75,6 +75,8 @@
>>>> * @version $Revision: 1.28 $
>>>> */
>>>> public class Queue extends BaseDestination implements Task {
>>>> +    public static final boolean LAZY_DISPATCH =
>>>> +    
>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>>>> "true"));
>>>>    private final Log log;
>>>>    private final List<Subscription> consumers = new 
>>>> ArrayList<Subscription>(50);
>>>>    private PendingMessageCursor messages;
>>>> @@ -212,12 +214,12 @@
>>>>            synchronized (pagedInMessages) {
>>>>                // Add all the matching messages in the queue to the
>>>>                // subscription.
>>>> -
>>>>                for (Iterator<MessageReference> i = 
>>>> pagedInMessages.values()
>>>>                        .iterator(); i.hasNext();) {
>>>>                    QueueMessageReference node = 
>>>> (QueueMessageReference) i
>>>>                            .next();
>>>> -                    if (!node.isDropped() && !node.isAcked() && 
>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>> +                    if ((!node.isDropped() || 
>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>> +            node.getLockOwner() == null) {
>>>>                        msgContext.setMessageReference(node);
>>>>                        if (sub.matches(node, msgContext)) {
>>>>                            sub.add(node);
>>>> @@ -940,7 +945,11 @@
>>>>        dispatchLock.lock();
>>>>        try{
>>>>
>>>> -            final int toPageIn = getMaxPageSize() - 
>>>> pagedInMessages.size();
>>>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>> +        if (LAZY_DISPATCH) {
>>>> +        // Only page in the minimum number of messages which can be 
>>>> dispatched immediately.
>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>>>> toPageIn);
>>>> +        }
>>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>                messages.setMaxBatchSize(toPageIn);
>>>>                int count = 0;
>>>> @@ -976,12 +985,25 @@
>>>>        }
>>>>        return result;
>>>>    }
>>>> +
>>>> +    private int getConsumerMessageCountBeforeFull() throws Exception {
>>>> +    int total = 0;
>>>> +        synchronized (consumers) {
>>>> +            for (Subscription s : consumers) {
>>>> +        if (s instanceof PrefetchSubscription) {
>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>> +        }
>>>> +        }
>>>> +    }
>>>> +    return total;
>>>> +    }
>>>>
>>>>    private void doDispatch(List<MessageReference> list) throws 
>>>> Exception {
>>>>
>>>>        if (list != null) {
>>>>            synchronized (consumers) {
>>>>                for (MessageReference node : list) {
>>>> +
>>>>                    Subscription target = null;
>>>>                    List<Subscription> targets = null;
>>>>                    for (Subscription s : consumers) {
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by Rob Davies <ra...@gmail.com>.
Hi David,

Yes - actually - I tried it a few days ago. I haven't committed it  
because message throughput is generally lower. I will look at making  
it optional via a destination policy

cheers,

Rob
On 6 Mar 2008, at 05:54, David Sitsky wrote:

> Hi Rob,
>
> I know its been a couple of weeks.  I've been using my changes for a  
> while and I see nice CPU and memory usage on the broker, and good  
> messaging performance for my application.  Have you had a chance to  
> try it out?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> thanks for the great feedback - will try your patch and see how it  
>> works!
>> cheers,
>> Rob
>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I like the new changes, but with the changes as they are, for my  
>>> application for one of my benchmarks, it takes twice as long to  
>>> complete.
>>>
>>> I believe the culprit for this is that when the new code can't  
>>> find a consumer which is not full, the broker chooses the consumer  
>>> with the lowest dispatch queue size.
>>>
>>> In my application, since I have a prefetch size of 1, and have  
>>> longish-running transactions, the dispatch queue size is not  
>>> indicative of the current load for that consumer.  As a result, I  
>>> think this is what is responsible for poor load-balancing in my  
>>> case.
>>>
>>> For applications which commit() after each processed message, I am  
>>> sure this wouldn't be the case.  In some ways, reverting to the  
>>> old behaviour of adding the pending message to all consumers might  
>>> lead to better load balancing with this code.
>>>
>>> However - I think it is better if the consumers can decide when  
>>> they want more messages rather than the broker pushing messages at  
>>> them? I've attached a patch which demonstrates this.  When  
>>> LAZY_DISPATCH is set to true (set via a system property for now  
>>> for testing purposes) this changes the behaviour slightly.
>>>
>>> The basic idea is pageInMessages() only pages in the minimum  
>>> number of messages that can be dispatched immediately to non-full  
>>> consumers. Whenever a consumer acks a message, which updates its  
>>> prefetch size, we make sure Queue.wakeup() is called so that the  
>>> consumer will receive new messages.
>>>
>>> With this change in effect - I see slightly faster or almost the  
>>> same times with the previous benchmark.  However memory usage on  
>>> the broker is far better, as the pending queues for each consumer  
>>> is either 0 or very small.
>>>
>>> What do you think?  I guess there are better ways of doing this.
>>>
>>> I am doing a large overnight run with 16 consumers, so we'll see  
>>> how the  performance goes.
>>>
>>> You'll also notice in the patch, that in Queue.addSubscriber(), I  
>>> thought there didn't seem to be any need for adding a message to a  
>>> new consumer if the message has already been locked by another  
>>> consumer?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> please let us know if these changes helps/hinders your app!
>>>> cheers,
>>>> Rob
>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>> If what I said above is true, then the immediately above if  
>>>>>>> statement needs to be moved outside its enclosing if -  
>>>>>>> otherwise it only gets executed when targets != null.  We'd  
>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>> Don't think so? We only want the message going to  one  
>>>>>> subscription? I may have misunderstood what you mean!
>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/PrefetchSubscription.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (working copy)
>>> @@ -160,6 +160,8 @@
>>>    public  void acknowledge(final ConnectionContext context,final  
>>> MessageAck ack) throws Exception {
>>>        // Handle the standard acknowledgment case.
>>>        boolean callDispatchMatched = false;
>>> +    Queue queue = null;
>>> +           synchronized(dispatchLock) {
>>>            if (ack.isStandardAck()) {
>>>                // Acknowledge all dispatched messages up till the  
>>> message id of
>>> @@ -223,8 +225,12 @@
>>>                                prefetchExtension = Math.max(0,
>>>                                        prefetchExtension - (index  
>>> + 1));
>>>                            }
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>> -                            break;
>>> +                break;
>>>                        }
>>>                    }
>>>                }
>>> @@ -253,6 +259,10 @@
>>>                    if  
>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>                        prefetchExtension =  
>>> Math.max(prefetchExtension,
>>>                                index + 1);
>>> +                        if (queue == null)
>>> +                        {
>>> +                            queue =  
>>> (Queue)node.getRegionDestination();
>>> +                        }
>>>                        callDispatchMatched = true;
>>>                        break;
>>>                    }
>>> @@ -279,6 +289,10 @@
>>>                    if (inAckRange) {
>>>                        node.incrementRedeliveryCounter();
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -320,6 +334,10 @@
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>>                            prefetchExtension = Math.max(0,  
>>> prefetchExtension
>>>                                    - (index + 1));
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -336,6 +354,9 @@
>>>            }
>>>        }
>>>        if (callDispatchMatched) {
>>> +        if (Queue.LAZY_DISPATCH) {
>>> +        queue.wakeup();
>>> +        }
>>>            dispatchPending();
>>>        } else {
>>>            if (isSlave()) {
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/Queue.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (working copy)
>>> @@ -75,6 +75,8 @@
>>> * @version $Revision: 1.28 $
>>> */
>>> public class Queue extends BaseDestination implements Task {
>>> +    public static final boolean LAZY_DISPATCH =
>>> +     
>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>> "true"));
>>>    private final Log log;
>>>    private final List<Subscription> consumers = new  
>>> ArrayList<Subscription>(50);
>>>    private PendingMessageCursor messages;
>>> @@ -212,12 +214,12 @@
>>>            synchronized (pagedInMessages) {
>>>                // Add all the matching messages in the queue to the
>>>                // subscription.
>>> -
>>>                for (Iterator<MessageReference> i =  
>>> pagedInMessages.values()
>>>                        .iterator(); i.hasNext();) {
>>>                    QueueMessageReference node =  
>>> (QueueMessageReference) i
>>>                            .next();
>>> -                    if (!node.isDropped() && !node.isAcked() && (! 
>>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>> +                    if ((!node.isDropped() ||  
>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>> +            node.getLockOwner() == null) {
>>>                        msgContext.setMessageReference(node);
>>>                        if (sub.matches(node, msgContext)) {
>>>                            sub.add(node);
>>> @@ -940,7 +945,11 @@
>>>        dispatchLock.lock();
>>>        try{
>>>
>>> -            final int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +            int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +        if (LAZY_DISPATCH) {
>>> +        // Only page in the minimum number of messages which can  
>>> be dispatched immediately.
>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(),  
>>> toPageIn);
>>> +        }
>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>                messages.setMaxBatchSize(toPageIn);
>>>                int count = 0;
>>> @@ -976,12 +985,25 @@
>>>        }
>>>        return result;
>>>    }
>>> +
>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>> Exception {
>>> +    int total = 0;
>>> +        synchronized (consumers) {
>>> +            for (Subscription s : consumers) {
>>> +        if (s instanceof PrefetchSubscription) {
>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>> +        }
>>> +        }
>>> +    }
>>> +    return total;
>>> +    }
>>>
>>>    private void doDispatch(List<MessageReference> list) throws  
>>> Exception {
>>>
>>>        if (list != null) {
>>>            synchronized (consumers) {
>>>                for (MessageReference node : list) {
>>> +
>>>                    Subscription target = null;
>>>                    List<Subscription> targets = null;
>>>                    for (Subscription s : consumers) {
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Re: Queue performance from recent changes

Posted by David Sitsky <si...@nuix.com>.
Hi Rob,

I've finally had some time to run some benchmarks with the trunk checked 
out today (637703) and the numbers look great!  Many thanks for checking 
in this code - I'm happy now that I don't have to maintain any of my own 
private modifications to activemq.

Thanks again.

Cheers,
David

David Sitsky wrote:
> Many thanks Rob - I'll try and do a fresh checkout today and let you 
> know how the performance looks using my standard benchmarks.
> 
> Cheers,
> David
> 
> Rob Davies wrote:
>> Hi David,
>>
>> the changes you suggested are now in and lazyDispatch can be set by a 
>> destination policy - its currently on by default
>>
>> cheers,
>>
>> Rob
>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>
>>> Hi Rob,
>>>
>>> I know its been a couple of weeks.  I've been using my changes for a 
>>> while and I see nice CPU and memory usage on the broker, and good 
>>> messaging performance for my application.  Have you had a chance to 
>>> try it out?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> thanks for the great feedback - will try your patch and see how it 
>>>> works!
>>>> cheers,
>>>> Rob
>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I like the new changes, but with the changes as they are, for my 
>>>>> application for one of my benchmarks, it takes twice as long to 
>>>>> complete.
>>>>>
>>>>> I believe the culprit for this is that when the new code can't find 
>>>>> a consumer which is not full, the broker chooses the consumer with 
>>>>> the lowest dispatch queue size.
>>>>>
>>>>> In my application, since I have a prefetch size of 1, and have 
>>>>> longish-running transactions, the dispatch queue size is not 
>>>>> indicative of the current load for that consumer.  As a result, I 
>>>>> think this is what is responsible for poor load-balancing in my case.
>>>>>
>>>>> For applications which commit() after each processed message, I am 
>>>>> sure this wouldn't be the case.  In some ways, reverting to the old 
>>>>> behaviour of adding the pending message to all consumers might lead 
>>>>> to better load balancing with this code.
>>>>>
>>>>> However - I think it is better if the consumers can decide when 
>>>>> they want more messages rather than the broker pushing messages at 
>>>>> them? I've attached a patch which demonstrates this.  When 
>>>>> LAZY_DISPATCH is set to true (set via a system property for now for 
>>>>> testing purposes) this changes the behaviour slightly.
>>>>>
>>>>> The basic idea is pageInMessages() only pages in the minimum number 
>>>>> of messages that can be dispatched immediately to non-full 
>>>>> consumers. Whenever a consumer acks a message, which updates its 
>>>>> prefetch size, we make sure Queue.wakeup() is called so that the 
>>>>> consumer will receive new messages.
>>>>>
>>>>> With this change in effect - I see slightly faster or almost the 
>>>>> same times with the previous benchmark.  However memory usage on 
>>>>> the broker is far better, as the pending queues for each consumer 
>>>>> is either 0 or very small.
>>>>>
>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>
>>>>> I am doing a large overnight run with 16 consumers, so we'll see 
>>>>> how the  performance goes.
>>>>>
>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>>>>> thought there didn't seem to be any need for adding a message to a 
>>>>> new consumer if the message has already been locked by another 
>>>>> consumer?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> please let us know if these changes helps/hinders your app!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>> If what I said above is true, then the immediately above if 
>>>>>>>>> statement needs to be moved outside its enclosing if - 
>>>>>>>>> otherwise it only gets executed when targets != null.  We'd 
>>>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>>>> Don't think so? We only want the message going to  one 
>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>>> 0699
>>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>>> 6902
>>>>> Index: 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>>>>
>>>>> ===================================================================
>>>>> --- 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>>> (revision 628917)
>>>>> +++ 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>>> (working copy)
>>>>> @@ -160,6 +160,8 @@
>>>>>    public  void acknowledge(final ConnectionContext context,final 
>>>>> MessageAck ack) throws Exception {
>>>>>        // Handle the standard acknowledgment case.
>>>>>        boolean callDispatchMatched = false;
>>>>> +    Queue queue = null;
>>>>> +           synchronized(dispatchLock) {
>>>>>            if (ack.isStandardAck()) {
>>>>>                // Acknowledge all dispatched messages up till the 
>>>>> message id of
>>>>> @@ -223,8 +225,12 @@
>>>>>                                prefetchExtension = Math.max(0,
>>>>>                                        prefetchExtension - (index + 
>>>>> 1));
>>>>>                            }
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                            callDispatchMatched = true;
>>>>> -                            break;
>>>>> +                break;
>>>>>                        }
>>>>>                    }
>>>>>                }
>>>>> @@ -253,6 +259,10 @@
>>>>>                    if 
>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>                        prefetchExtension = Math.max(prefetchExtension,
>>>>>                                index + 1);
>>>>> +                        if (queue == null)
>>>>> +                        {
>>>>> +                            queue = 
>>>>> (Queue)node.getRegionDestination();
>>>>> +                        }
>>>>>                        callDispatchMatched = true;
>>>>>                        break;
>>>>>                    }
>>>>> @@ -279,6 +289,10 @@
>>>>>                    if (inAckRange) {
>>>>>                        node.incrementRedeliveryCounter();
>>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                            callDispatchMatched = true;
>>>>>                            break;
>>>>>                        }
>>>>> @@ -320,6 +334,10 @@
>>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>>>                            prefetchExtension = Math.max(0, 
>>>>> prefetchExtension
>>>>>                                    - (index + 1));
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                            callDispatchMatched = true;
>>>>>                            break;
>>>>>                        }
>>>>> @@ -336,6 +354,9 @@
>>>>>            }
>>>>>        }
>>>>>        if (callDispatchMatched) {
>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>> +        queue.wakeup();
>>>>> +        }
>>>>>            dispatchPending();
>>>>>        } else {
>>>>>            if (isSlave()) {
>>>>> Index: 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java 
>>>>>
>>>>> ===================================================================
>>>>> --- 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>>> (revision 628917)
>>>>> +++ 
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>>> (working copy)
>>>>> @@ -75,6 +75,8 @@
>>>>> * @version $Revision: 1.28 $
>>>>> */
>>>>> public class Queue extends BaseDestination implements Task {
>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>> +    
>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>>>>> "true"));
>>>>>    private final Log log;
>>>>>    private final List<Subscription> consumers = new 
>>>>> ArrayList<Subscription>(50);
>>>>>    private PendingMessageCursor messages;
>>>>> @@ -212,12 +214,12 @@
>>>>>            synchronized (pagedInMessages) {
>>>>>                // Add all the matching messages in the queue to the
>>>>>                // subscription.
>>>>> -
>>>>>                for (Iterator<MessageReference> i = 
>>>>> pagedInMessages.values()
>>>>>                        .iterator(); i.hasNext();) {
>>>>>                    QueueMessageReference node = 
>>>>> (QueueMessageReference) i
>>>>>                            .next();
>>>>> -                    if (!node.isDropped() && !node.isAcked() && 
>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>> +                    if ((!node.isDropped() || 
>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>> +            node.getLockOwner() == null) {
>>>>>                        msgContext.setMessageReference(node);
>>>>>                        if (sub.matches(node, msgContext)) {
>>>>>                            sub.add(node);
>>>>> @@ -940,7 +945,11 @@
>>>>>        dispatchLock.lock();
>>>>>        try{
>>>>>
>>>>> -            final int toPageIn = getMaxPageSize() - 
>>>>> pagedInMessages.size();
>>>>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>>> +        if (LAZY_DISPATCH) {
>>>>> +        // Only page in the minimum number of messages which can 
>>>>> be dispatched immediately.
>>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>>>>> toPageIn);
>>>>> +        }
>>>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>                messages.setMaxBatchSize(toPageIn);
>>>>>                int count = 0;
>>>>> @@ -976,12 +985,25 @@
>>>>>        }
>>>>>        return result;
>>>>>    }
>>>>> +
>>>>> +    private int getConsumerMessageCountBeforeFull() throws 
>>>>> Exception {
>>>>> +    int total = 0;
>>>>> +        synchronized (consumers) {
>>>>> +            for (Subscription s : consumers) {
>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>>> +        }
>>>>> +        }
>>>>> +    }
>>>>> +    return total;
>>>>> +    }
>>>>>
>>>>>    private void doDispatch(List<MessageReference> list) throws 
>>>>> Exception {
>>>>>
>>>>>        if (list != null) {
>>>>>            synchronized (consumers) {
>>>>>                for (MessageReference node : list) {
>>>>> +
>>>>>                    Subscription target = null;
>>>>>                    List<Subscription> targets = null;
>>>>>                    for (Subscription s : consumers) {
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
> 
> 


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by David Sitsky <si...@nuix.com>.
Many thanks Rob - I'll try and do a fresh checkout today and let you 
know how the performance looks using my standard benchmarks.

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> the changes you suggested are now in and lazyDispatch can be set by a 
> destination policy - its currently on by default
> 
> cheers,
> 
> Rob
> On 6 Mar 2008, at 05:54, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I know its been a couple of weeks.  I've been using my changes for a 
>> while and I see nice CPU and memory usage on the broker, and good 
>> messaging performance for my application.  Have you had a chance to 
>> try it out?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> thanks for the great feedback - will try your patch and see how it 
>>> works!
>>> cheers,
>>> Rob
>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I like the new changes, but with the changes as they are, for my 
>>>> application for one of my benchmarks, it takes twice as long to 
>>>> complete.
>>>>
>>>> I believe the culprit for this is that when the new code can't find 
>>>> a consumer which is not full, the broker chooses the consumer with 
>>>> the lowest dispatch queue size.
>>>>
>>>> In my application, since I have a prefetch size of 1, and have 
>>>> longish-running transactions, the dispatch queue size is not 
>>>> indicative of the current load for that consumer.  As a result, I 
>>>> think this is what is responsible for poor load-balancing in my case.
>>>>
>>>> For applications which commit() after each processed message, I am 
>>>> sure this wouldn't be the case.  In some ways, reverting to the old 
>>>> behaviour of adding the pending message to all consumers might lead 
>>>> to better load balancing with this code.
>>>>
>>>> However - I think it is better if the consumers can decide when they 
>>>> want more messages rather than the broker pushing messages at them? 
>>>> I've attached a patch which demonstrates this.  When LAZY_DISPATCH 
>>>> is set to true (set via a system property for now for testing 
>>>> purposes) this changes the behaviour slightly.
>>>>
>>>> The basic idea is pageInMessages() only pages in the minimum number 
>>>> of messages that can be dispatched immediately to non-full 
>>>> consumers. Whenever a consumer acks a message, which updates its 
>>>> prefetch size, we make sure Queue.wakeup() is called so that the 
>>>> consumer will receive new messages.
>>>>
>>>> With this change in effect - I see slightly faster or almost the 
>>>> same times with the previous benchmark.  However memory usage on the 
>>>> broker is far better, as the pending queues for each consumer is 
>>>> either 0 or very small.
>>>>
>>>> What do you think?  I guess there are better ways of doing this.
>>>>
>>>> I am doing a large overnight run with 16 consumers, so we'll see how 
>>>> the  performance goes.
>>>>
>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>>>> thought there didn't seem to be any need for adding a message to a 
>>>> new consumer if the message has already been locked by another 
>>>> consumer?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> please let us know if these changes helps/hinders your app!
>>>>> cheers,
>>>>> Rob
>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>> If what I said above is true, then the immediately above if 
>>>>>>>> statement needs to be moved outside its enclosing if - otherwise 
>>>>>>>> it only gets executed when targets != null.  We'd want this to 
>>>>>>>> execute if we found a matching target wouldn't we?
>>>>>>> Don't think so? We only want the message going to  one 
>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>> 0699
>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>> 6902
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java    
>>>> (working copy)
>>>> @@ -160,6 +160,8 @@
>>>>    public  void acknowledge(final ConnectionContext context,final 
>>>> MessageAck ack) throws Exception {
>>>>        // Handle the standard acknowledgment case.
>>>>        boolean callDispatchMatched = false;
>>>> +    Queue queue = null;
>>>> +           synchronized(dispatchLock) {
>>>>            if (ack.isStandardAck()) {
>>>>                // Acknowledge all dispatched messages up till the 
>>>> message id of
>>>> @@ -223,8 +225,12 @@
>>>>                                prefetchExtension = Math.max(0,
>>>>                                        prefetchExtension - (index + 
>>>> 1));
>>>>                            }
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>> -                            break;
>>>> +                break;
>>>>                        }
>>>>                    }
>>>>                }
>>>> @@ -253,6 +259,10 @@
>>>>                    if 
>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>                        prefetchExtension = Math.max(prefetchExtension,
>>>>                                index + 1);
>>>> +                        if (queue == null)
>>>> +                        {
>>>> +                            queue = 
>>>> (Queue)node.getRegionDestination();
>>>> +                        }
>>>>                        callDispatchMatched = true;
>>>>                        break;
>>>>                    }
>>>> @@ -279,6 +289,10 @@
>>>>                    if (inAckRange) {
>>>>                        node.incrementRedeliveryCounter();
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -320,6 +334,10 @@
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>>                            prefetchExtension = Math.max(0, 
>>>> prefetchExtension
>>>>                                    - (index + 1));
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -336,6 +354,9 @@
>>>>            }
>>>>        }
>>>>        if (callDispatchMatched) {
>>>> +        if (Queue.LAZY_DISPATCH) {
>>>> +        queue.wakeup();
>>>> +        }
>>>>            dispatchPending();
>>>>        } else {
>>>>            if (isSlave()) {
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java 
>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>>>> (working copy)
>>>> @@ -75,6 +75,8 @@
>>>> * @version $Revision: 1.28 $
>>>> */
>>>> public class Queue extends BaseDestination implements Task {
>>>> +    public static final boolean LAZY_DISPATCH =
>>>> +    
>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>>>> "true"));
>>>>    private final Log log;
>>>>    private final List<Subscription> consumers = new 
>>>> ArrayList<Subscription>(50);
>>>>    private PendingMessageCursor messages;
>>>> @@ -212,12 +214,12 @@
>>>>            synchronized (pagedInMessages) {
>>>>                // Add all the matching messages in the queue to the
>>>>                // subscription.
>>>> -
>>>>                for (Iterator<MessageReference> i = 
>>>> pagedInMessages.values()
>>>>                        .iterator(); i.hasNext();) {
>>>>                    QueueMessageReference node = 
>>>> (QueueMessageReference) i
>>>>                            .next();
>>>> -                    if (!node.isDropped() && !node.isAcked() && 
>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>> +                    if ((!node.isDropped() || 
>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>> +            node.getLockOwner() == null) {
>>>>                        msgContext.setMessageReference(node);
>>>>                        if (sub.matches(node, msgContext)) {
>>>>                            sub.add(node);
>>>> @@ -940,7 +945,11 @@
>>>>        dispatchLock.lock();
>>>>        try{
>>>>
>>>> -            final int toPageIn = getMaxPageSize() - 
>>>> pagedInMessages.size();
>>>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>> +        if (LAZY_DISPATCH) {
>>>> +        // Only page in the minimum number of messages which can be 
>>>> dispatched immediately.
>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>>>> toPageIn);
>>>> +        }
>>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>                messages.setMaxBatchSize(toPageIn);
>>>>                int count = 0;
>>>> @@ -976,12 +985,25 @@
>>>>        }
>>>>        return result;
>>>>    }
>>>> +
>>>> +    private int getConsumerMessageCountBeforeFull() throws Exception {
>>>> +    int total = 0;
>>>> +        synchronized (consumers) {
>>>> +            for (Subscription s : consumers) {
>>>> +        if (s instanceof PrefetchSubscription) {
>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>> +        }
>>>> +        }
>>>> +    }
>>>> +    return total;
>>>> +    }
>>>>
>>>>    private void doDispatch(List<MessageReference> list) throws 
>>>> Exception {
>>>>
>>>>        if (list != null) {
>>>>            synchronized (consumers) {
>>>>                for (MessageReference node : list) {
>>>> +
>>>>                    Subscription target = null;
>>>>                    List<Subscription> targets = null;
>>>>                    for (Subscription s : consumers) {
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Re: Queue performance from recent changes

Posted by Rob Davies <ra...@gmail.com>.
Hi David,

the changes you suggested are now in and lazyDispatch can be set by a  
destination policy - its currently on by default

cheers,

Rob
On 6 Mar 2008, at 05:54, David Sitsky wrote:

> Hi Rob,
>
> I know its been a couple of weeks.  I've been using my changes for a  
> while and I see nice CPU and memory usage on the broker, and good  
> messaging performance for my application.  Have you had a chance to  
> try it out?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> thanks for the great feedback - will try your patch and see how it  
>> works!
>> cheers,
>> Rob
>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I like the new changes, but with the changes as they are, for my  
>>> application for one of my benchmarks, it takes twice as long to  
>>> complete.
>>>
>>> I believe the culprit for this is that when the new code can't  
>>> find a consumer which is not full, the broker chooses the consumer  
>>> with the lowest dispatch queue size.
>>>
>>> In my application, since I have a prefetch size of 1, and have  
>>> longish-running transactions, the dispatch queue size is not  
>>> indicative of the current load for that consumer.  As a result, I  
>>> think this is what is responsible for poor load-balancing in my  
>>> case.
>>>
>>> For applications which commit() after each processed message, I am  
>>> sure this wouldn't be the case.  In some ways, reverting to the  
>>> old behaviour of adding the pending message to all consumers might  
>>> lead to better load balancing with this code.
>>>
>>> However - I think it is better if the consumers can decide when  
>>> they want more messages rather than the broker pushing messages at  
>>> them? I've attached a patch which demonstrates this.  When  
>>> LAZY_DISPATCH is set to true (set via a system property for now  
>>> for testing purposes) this changes the behaviour slightly.
>>>
>>> The basic idea is pageInMessages() only pages in the minimum  
>>> number of messages that can be dispatched immediately to non-full  
>>> consumers. Whenever a consumer acks a message, which updates its  
>>> prefetch size, we make sure Queue.wakeup() is called so that the  
>>> consumer will receive new messages.
>>>
>>> With this change in effect - I see slightly faster or almost the  
>>> same times with the previous benchmark.  However memory usage on  
>>> the broker is far better, as the pending queues for each consumer  
>>> is either 0 or very small.
>>>
>>> What do you think?  I guess there are better ways of doing this.
>>>
>>> I am doing a large overnight run with 16 consumers, so we'll see  
>>> how the  performance goes.
>>>
>>> You'll also notice in the patch, that in Queue.addSubscriber(), I  
>>> thought there didn't seem to be any need for adding a message to a  
>>> new consumer if the message has already been locked by another  
>>> consumer?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> please let us know if these changes helps/hinders your app!
>>>> cheers,
>>>> Rob
>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>> If what I said above is true, then the immediately above if  
>>>>>>> statement needs to be moved outside its enclosing if -  
>>>>>>> otherwise it only gets executed when targets != null.  We'd  
>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>> Don't think so? We only want the message going to  one  
>>>>>> subscription? I may have misunderstood what you mean!
>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/PrefetchSubscription.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (working copy)
>>> @@ -160,6 +160,8 @@
>>>    public  void acknowledge(final ConnectionContext context,final  
>>> MessageAck ack) throws Exception {
>>>        // Handle the standard acknowledgment case.
>>>        boolean callDispatchMatched = false;
>>> +    Queue queue = null;
>>> +           synchronized(dispatchLock) {
>>>            if (ack.isStandardAck()) {
>>>                // Acknowledge all dispatched messages up till the  
>>> message id of
>>> @@ -223,8 +225,12 @@
>>>                                prefetchExtension = Math.max(0,
>>>                                        prefetchExtension - (index  
>>> + 1));
>>>                            }
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>> -                            break;
>>> +                break;
>>>                        }
>>>                    }
>>>                }
>>> @@ -253,6 +259,10 @@
>>>                    if  
>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>                        prefetchExtension =  
>>> Math.max(prefetchExtension,
>>>                                index + 1);
>>> +                        if (queue == null)
>>> +                        {
>>> +                            queue =  
>>> (Queue)node.getRegionDestination();
>>> +                        }
>>>                        callDispatchMatched = true;
>>>                        break;
>>>                    }
>>> @@ -279,6 +289,10 @@
>>>                    if (inAckRange) {
>>>                        node.incrementRedeliveryCounter();
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -320,6 +334,10 @@
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>>                            prefetchExtension = Math.max(0,  
>>> prefetchExtension
>>>                                    - (index + 1));
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -336,6 +354,9 @@
>>>            }
>>>        }
>>>        if (callDispatchMatched) {
>>> +        if (Queue.LAZY_DISPATCH) {
>>> +        queue.wakeup();
>>> +        }
>>>            dispatchPending();
>>>        } else {
>>>            if (isSlave()) {
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/Queue.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (working copy)
>>> @@ -75,6 +75,8 @@
>>> * @version $Revision: 1.28 $
>>> */
>>> public class Queue extends BaseDestination implements Task {
>>> +    public static final boolean LAZY_DISPATCH =
>>> +     
>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>> "true"));
>>>    private final Log log;
>>>    private final List<Subscription> consumers = new  
>>> ArrayList<Subscription>(50);
>>>    private PendingMessageCursor messages;
>>> @@ -212,12 +214,12 @@
>>>            synchronized (pagedInMessages) {
>>>                // Add all the matching messages in the queue to the
>>>                // subscription.
>>> -
>>>                for (Iterator<MessageReference> i =  
>>> pagedInMessages.values()
>>>                        .iterator(); i.hasNext();) {
>>>                    QueueMessageReference node =  
>>> (QueueMessageReference) i
>>>                            .next();
>>> -                    if (!node.isDropped() && !node.isAcked() && (! 
>>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>> +                    if ((!node.isDropped() ||  
>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>> +            node.getLockOwner() == null) {
>>>                        msgContext.setMessageReference(node);
>>>                        if (sub.matches(node, msgContext)) {
>>>                            sub.add(node);
>>> @@ -940,7 +945,11 @@
>>>        dispatchLock.lock();
>>>        try{
>>>
>>> -            final int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +            int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +        if (LAZY_DISPATCH) {
>>> +        // Only page in the minimum number of messages which can  
>>> be dispatched immediately.
>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(),  
>>> toPageIn);
>>> +        }
>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>                messages.setMaxBatchSize(toPageIn);
>>>                int count = 0;
>>> @@ -976,12 +985,25 @@
>>>        }
>>>        return result;
>>>    }
>>> +
>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>> Exception {
>>> +    int total = 0;
>>> +        synchronized (consumers) {
>>> +            for (Subscription s : consumers) {
>>> +        if (s instanceof PrefetchSubscription) {
>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>> +        }
>>> +        }
>>> +    }
>>> +    return total;
>>> +    }
>>>
>>>    private void doDispatch(List<MessageReference> list) throws  
>>> Exception {
>>>
>>>        if (list != null) {
>>>            synchronized (consumers) {
>>>                for (MessageReference node : list) {
>>> +
>>>                    Subscription target = null;
>>>                    List<Subscription> targets = null;
>>>                    for (Subscription s : consumers) {
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902