You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Carl Trieloff <cc...@redhat.com> on 2009/11/06 20:30:19 UTC

Remove liner queue functions

I created a patch which seems to work well, it targets querying the 
queue, count, acquire making the
queue access faster for large queues (best 1 {if no requeue or acquire}, 
worst case binary-search) . In
most cases if it faster then binary search even if requeue or selector 
is used.

It does require that the re-queue order be corrected - which should be 
done regardless.

The remaining function that could use some similar dressing would be 
Queue::seek()

Any thoughts on the patch... This patch opens the way for reasonable 
selector performance.
Carl.


Re: Remove liner queue functions

Posted by Carl Trieloff <cc...@redhat.com>.
yes, they are in the head of the trunk.

Gordon's patch now makes sure that messages are put back onto the queue 
in order or sequence number. This is the part I
believe that made life hard for you last time round.

Carl.

chenta lee wrote:
> Carl,
> Did those patches merge into the current source tree? And could you 
> tell me which patches they are?
>
> Chenta 
>
> On Wed, Nov 25, 2009 at 9:49 PM, Carl Trieloff <cctrieloff@redhat.com 
> <ma...@redhat.com>> wrote:
>
>
>     Chenta,
>
>     Two things to note, Gordon has now put a patch in that corrects
>     the order of the messages on requeue, i.e. they now stay in order
>     by sequence number even after rollback. Thus with his patch and my
>     seek() seekAt() we should now be able to resolve the acquire case
>     quite easily and update the position.
>
>     Take a look and let me know if you need some help with that.
>
>     (i.e. before these two patches it would not have been possible,
>     but now I believe it is)
>
>     Carl.
>
>
>
>     chenta lee wrote:
>>     I did the consumer sequence number wrap-around is because that
>>     when we requeue the message, I cannot know which consumer consume
>>     it. Therefore, I cannot not update the sequence number of
>>     consumer and the messages rollback. The consequence is that when
>>     a consumer rollback (requeue) messages, they can not acquire them
>>     anymore (because requeue_msg.position is always larger than
>>     consumer.position ).
>>
>>     However, my patch is not that dirty :), I didn't change the
>>     original algorithm. We do not update the consumer sequence in
>>     consumeNextMessage at the very beginning. From my point of view,
>>     the only concern is that when a user decide to use selector in
>>     their messages, they might suffer from performance issue,
>>     however, the other users who do not use selector will be just fine.
>>
>>     Chenta
>>
>>     On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff
>>     <cctrieloff@redhat.com <ma...@redhat.com>> wrote:
>>
>>
>>         Thanks, the one remaining issue I know of with the selector
>>         patch is that consumer
>>         sequence number wrap-around I don't think works.
>>
>>         We need a test there and maybe change the comp operators in
>>         your patch. I was looking into
>>         that last week on the selector patch, I'm itching to get the
>>         patch in.
>>
>>         Carl.
>>
>>
>>
>>         chenta lee wrote:
>>>         Hi Carl,
>>>         This patch looks great, I will update the selector patch later.
>>>
>>>         Chenta
>>>
>>>         On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff
>>>         <cctrieloff@redhat.com <ma...@redhat.com>> wrote:
>>>
>>>
>>>             I created a patch which seems to work well, it targets
>>>             querying the queue, count, acquire making the
>>>             queue access faster for large queues (best 1 {if no
>>>             requeue or acquire}, worst case binary-search) . In
>>>             most cases if it faster then binary search even if
>>>             requeue or selector is used.
>>>
>>>             It does require that the re-queue order be corrected -
>>>             which should be done regardless.
>>>
>>>             The remaining function that could use some similar
>>>             dressing would be Queue::seek()
>>>
>>>             Any thoughts on the patch... This patch opens the way
>>>             for reasonable selector performance.
>>>             Carl.
>>>
>>>
>>>             Index: qpid/broker/Queue.cpp
>>>             ===================================================================
>>>             --- qpid/broker/Queue.cpp       (revision 833135)
>>>             +++ qpid/broker/Queue.cpp       (working copy)
>>>             @@ -243,18 +243,18 @@
>>>              {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>                 QPID_LOG(debug, "Attempting to acquire message at "
>>>             << position);
>>>             -    for (Messages::iterator i = messages.begin(); i !=
>>>             messages.end(); i++) {
>>>             -        if (i->position == position) {
>>>             -            message = *i;
>>>             -            if (lastValueQueue) {
>>>             -                clearLVQIndex(*i);
>>>             -            }
>>>             -            QPID_LOG(debug,
>>>             -                     "Acquired message at " <<
>>>             i->position << " from " << name);
>>>             -            messages.erase(i);
>>>             -            return true;
>>>             +
>>>             +    Messages::iterator i = findAt(position);
>>>             +    if (i != messages.end() ) {
>>>             +        message = *i;
>>>             +        if (lastValueQueue) {
>>>             +            clearLVQIndex(*i);
>>>                     }
>>>             -    }
>>>             +        QPID_LOG(debug,
>>>             +                 "Acquired message at " << i->position
>>>             << " from " << name);
>>>             +        messages.erase(i);
>>>             +        return true;
>>>             +    }
>>>                 QPID_LOG(debug, "Could not acquire message at " <<
>>>             position << " from " << name << "; no message at that
>>>             position");
>>>                 return false;
>>>              }
>>>             @@ -262,21 +262,21 @@
>>>              bool Queue::acquire(const QueuedMessage& msg) {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>                 QPID_LOG(debug, "attempting to acquire " <<
>>>             msg.position);
>>>             -    for (Messages::iterator i = messages.begin(); i !=
>>>             messages.end(); i++) {
>>>             -        if ((i->position == msg.position &&
>>>             !lastValueQueue) // note that in some cases payload not
>>>             be set
>>>             -            || (lastValueQueue && (i->position ==
>>>             msg.position) &&
>>>             -                msg.payload.get() ==
>>>             checkLvqReplace(*i).payload.get()) )  {
>>>             +    Messages::iterator i = findAt(msg.position);
>>>             +    if ((i != messages.end() && !lastValueQueue) //
>>>             note that in some cases payload not be set
>>>             +        || (lastValueQueue && (i->position ==
>>>             msg.position) &&
>>>             +            msg.payload.get() ==
>>>             checkLvqReplace(*i).payload.get()) )  {
>>>
>>>             -            clearLVQIndex(msg);
>>>             -            QPID_LOG(debug,
>>>             -                     "Match found, acquire succeeded: " <<
>>>             -                     i->position << " == " <<
>>>             msg.position);
>>>             -            messages.erase(i);
>>>             -            return true;
>>>             -        } else {
>>>             -            QPID_LOG(debug, "No match: " << i->position
>>>             << " != " << msg.position);
>>>             -        }
>>>             +        clearLVQIndex(msg);
>>>             +        QPID_LOG(debug,
>>>             +                 "Match found, acquire succeeded: " <<
>>>             +                 i->position << " == " << msg.position);
>>>             +        messages.erase(i);
>>>             +        return true;
>>>             +    } else {
>>>             +        QPID_LOG(debug, "No match: " << i->position <<
>>>             " != " << msg.position);
>>>                 }
>>>             +
>>>                 QPID_LOG(debug, "Acquire failed for " << msg.position);
>>>                 return false;
>>>              }
>>>             @@ -445,19 +445,35 @@
>>>                 return false;
>>>              }
>>>
>>>             -namespace {
>>>             -struct PositionEquals {
>>>             -    SequenceNumber pos;
>>>             -    PositionEquals(SequenceNumber p) : pos(p) {}
>>>             -    bool operator()(const QueuedMessage& msg) const {
>>>             return msg.position == pos; }
>>>             -};
>>>             -}// namespace
>>>             +Queue::Messages::iterator Queue::findAt(SequenceNumber
>>>             pos) {
>>>
>>>             +    if(!messages.empty()){
>>>             +        QueuedMessage compM;
>>>             +        compM.position = pos;
>>>             +        unsigned long diff = pos.getValue() -
>>>             messages.front().position.getValue();
>>>             +        long maxEnd = diff < messages.size()? diff :
>>>             messages.size();
>>>             +
>>>             +        Messages::iterator i =
>>>             lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>>             +        if (i->position == pos)
>>>             +            return i;
>>>             +    }
>>>             +    return messages.end(); // no match found.
>>>             +}
>>>             +
>>>             +
>>>              QueuedMessage Queue::find(SequenceNumber pos) const {
>>>             +
>>>                 Mutex::ScopedLock locker(messageLock);
>>>             -    Messages::const_iterator i =
>>>             std::find_if(messages.begin(), messages.end(),
>>>             PositionEquals(pos));
>>>             -    if (i != messages.end())
>>>             -        return *i;
>>>             +    if(!messages.empty()){
>>>             +        QueuedMessage compM;
>>>             +        compM.position = pos;
>>>             +        unsigned long diff = pos.getValue() -
>>>             messages.front().position.getValue();
>>>             +        long maxEnd = diff < messages.size()? diff :
>>>             messages.size();
>>>             +
>>>             +        Messages::const_iterator i =
>>>             lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>>             +        if (i != messages.end())
>>>             +            return *i;
>>>             +    }
>>>                 return QueuedMessage();
>>>              }
>>>
>>>             @@ -642,10 +658,9 @@
>>>              }
>>>
>>>              /** function only provided for unit tests, or code not
>>>             in critical message path */
>>>             -uint32_t Queue::getMessageCount() const
>>>             +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>>              {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>             -
>>>                 uint32_t count = 0;
>>>                 for ( Messages::const_iterator i = messages.begin();
>>>             i != messages.end(); ++i ) {
>>>                     //NOTE: don't need to use checkLvqReplace() here
>>>             as it
>>>             @@ -657,6 +672,12 @@
>>>                 return count;
>>>              }
>>>
>>>             +uint32_t Queue::getMessageCount() const
>>>             +{
>>>             +    Mutex::ScopedLock locker(messageLock);
>>>             +    return messages.size();
>>>             +}
>>>             +
>>>              uint32_t Queue::getConsumerCount() const
>>>              {
>>>                 Mutex::ScopedLock locker(consumerLock);
>>>             Index: qpid/broker/QueuedMessage.h
>>>             ===================================================================
>>>             --- qpid/broker/QueuedMessage.h (revision 833135)
>>>             +++ qpid/broker/QueuedMessage.h (working copy)
>>>             @@ -38,7 +38,9 @@
>>>                 QueuedMessage(Queue* q,
>>>             boost::intrusive_ptr<Message> msg,
>>>             framing::SequenceNumber sn) :
>>>                     payload(msg), position(sn), queue(q) {}
>>>                 QueuedMessage(Queue* q) : queue(q) {}
>>>             +
>>>              };
>>>             +    inline bool operator<(const QueuedMessage& a, const
>>>             QueuedMessage& b) { return a.position < b.position; }
>>>
>>>              }}
>>>
>>>             Index: qpid/broker/Queue.h
>>>             ===================================================================
>>>             --- qpid/broker/Queue.h (revision 833135)
>>>             +++ qpid/broker/Queue.h (working copy)
>>>             @@ -148,6 +148,8 @@
>>>                                 }
>>>                             }
>>>                         }
>>>             +
>>>             +            Messages::iterator
>>>             findAt(framing::SequenceNumber pos);
>>>
>>>                     public:
>>>
>>>             @@ -221,6 +223,7 @@
>>>                         uint32_t move(const Queue::shared_ptr destq,
>>>             uint32_t qty);
>>>
>>>                         QPID_BROKER_EXTERN uint32_t
>>>             getMessageCount() const;
>>>             +            QPID_BROKER_EXTERN uint32_t
>>>             getEnqueueCompleteMessageCount() const;
>>>                         QPID_BROKER_EXTERN uint32_t
>>>             getConsumerCount() const;
>>>                         inline const string& getName() const {
>>>             return name; }
>>>                         bool isExclusiveOwner(const OwnershipToken*
>>>             const o) const;
>>>             Index: tests/QueueTest.cpp
>>>             ===================================================================
>>>             --- tests/QueueTest.cpp (revision 833135)
>>>             +++ tests/QueueTest.cpp (working copy)
>>>             @@ -120,9 +120,10 @@
>>>                 queue->process(msg1);
>>>                 sleep(2);
>>>                 uint32_t compval=0;
>>>             -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>             +    BOOST_CHECK_EQUAL(compval,
>>>             queue->getEnqueueCompleteMessageCount());
>>>                 msg1->enqueueComplete();
>>>                 compval=1;
>>>             +    BOOST_CHECK_EQUAL(compval,
>>>             queue->getEnqueueCompleteMessageCount());
>>>                 BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>              }
>>>
>>>
>>>
>>>             ---------------------------------------------------------------------
>>>             Apache Qpid - AMQP Messaging Implementation
>>>             Project:      http://qpid.apache.org
>>>             Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>             <ma...@qpid.apache.org>
>>>
>>>
>>
>>
>
>


Re: Remove liner queue functions

Posted by chenta lee <ch...@gmail.com>.
Carl,
Did those patches merge into the current source tree? And could you tell me
which patches they are?

Chenta

On Wed, Nov 25, 2009 at 9:49 PM, Carl Trieloff <cc...@redhat.com>wrote:

>
> Chenta,
>
> Two things to note, Gordon has now put a patch in that corrects the order
> of the messages on requeue, i.e. they now stay in order by sequence number
> even after rollback. Thus with his patch and my seek() seekAt() we should
> now be able to resolve the acquire case quite easily and update the
> position.
>
> Take a look and let me know if you need some help with that.
>
> (i.e. before these two patches it would not have been possible, but now I
> believe it is)
>
> Carl.
>
>
>
> chenta lee wrote:
>
> I did the consumer sequence number wrap-around is because that when we
> requeue the message, I cannot know which consumer consume it. Therefore, I
> cannot not update the sequence number of consumer and the messages rollback.
> The consequence is that when a consumer rollback (requeue) messages, they
> can not acquire them anymore (because requeue_msg.position is always larger
> than consumer.position ).
>
>  However, my patch is not that dirty :), I didn't change the original
> algorithm. We do not update the consumer sequence in consumeNextMessage at
> the very beginning. From my point of view, the only concern is that when a
> user decide to use selector in their messages, they might suffer from
> performance issue, however, the other users who do not use selector will be
> just fine.
>
>  Chenta
>
> On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cc...@redhat.com>wrote:
>
>>
>> Thanks, the one remaining issue I know of with the selector patch is that
>> consumer
>> sequence number wrap-around I don't think works.
>>
>> We need a test there and maybe change the comp operators in your patch. I
>> was looking into
>> that last week on the selector patch, I'm itching to get the patch in.
>>
>> Carl.
>>
>>
>>
>> chenta lee wrote:
>>
>> Hi Carl,
>> This patch looks great, I will update the selector patch later.
>>
>>  Chenta
>>
>> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cc...@redhat.com>wrote:
>>
>>>
>>> I created a patch which seems to work well, it targets querying the
>>> queue, count, acquire making the
>>> queue access faster for large queues (best 1 {if no requeue or acquire},
>>> worst case binary-search) . In
>>> most cases if it faster then binary search even if requeue or selector is
>>> used.
>>>
>>> It does require that the re-queue order be corrected - which should be
>>> done regardless.
>>>
>>> The remaining function that could use some similar dressing would be
>>> Queue::seek()
>>>
>>> Any thoughts on the patch... This patch opens the way for reasonable
>>> selector performance.
>>> Carl.
>>>
>>>
>>> Index: qpid/broker/Queue.cpp
>>> ===================================================================
>>> --- qpid/broker/Queue.cpp       (revision 833135)
>>> +++ qpid/broker/Queue.cpp       (working copy)
>>> @@ -243,18 +243,18 @@
>>>  {
>>>     Mutex::ScopedLock locker(messageLock);
>>>     QPID_LOG(debug, "Attempting to acquire message at " << position);
>>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>>> i++) {
>>> -        if (i->position == position) {
>>> -            message = *i;
>>> -            if (lastValueQueue) {
>>> -                clearLVQIndex(*i);
>>> -            }
>>> -            QPID_LOG(debug,
>>> -                     "Acquired message at " << i->position << " from "
>>> << name);
>>> -            messages.erase(i);
>>> -            return true;
>>> +
>>> +    Messages::iterator i = findAt(position);
>>> +    if (i != messages.end() ) {
>>> +        message = *i;
>>> +        if (lastValueQueue) {
>>> +            clearLVQIndex(*i);
>>>         }
>>> -    }
>>> +        QPID_LOG(debug,
>>> +                 "Acquired message at " << i->position << " from " <<
>>> name);
>>> +        messages.erase(i);
>>> +        return true;
>>> +    }
>>>     QPID_LOG(debug, "Could not acquire message at " << position << " from
>>> " << name << "; no message at that position");
>>>     return false;
>>>  }
>>> @@ -262,21 +262,21 @@
>>>  bool Queue::acquire(const QueuedMessage& msg) {
>>>     Mutex::ScopedLock locker(messageLock);
>>>     QPID_LOG(debug, "attempting to acquire " << msg.position);
>>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>>> i++) {
>>> -        if ((i->position == msg.position && !lastValueQueue) // note
>>> that in some cases payload not be set
>>> -            || (lastValueQueue && (i->position == msg.position) &&
>>> -                msg.payload.get() == checkLvqReplace(*i).payload.get())
>>> )  {
>>> +    Messages::iterator i = findAt(msg.position);
>>> +    if ((i != messages.end() && !lastValueQueue) // note that in some
>>> cases payload not be set
>>> +        || (lastValueQueue && (i->position == msg.position) &&
>>> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>>>
>>> -            clearLVQIndex(msg);
>>> -            QPID_LOG(debug,
>>> -                     "Match found, acquire succeeded: " <<
>>> -                     i->position << " == " << msg.position);
>>> -            messages.erase(i);
>>> -            return true;
>>> -        } else {
>>> -            QPID_LOG(debug, "No match: " << i->position << " != " <<
>>> msg.position);
>>> -        }
>>> +        clearLVQIndex(msg);
>>> +        QPID_LOG(debug,
>>> +                 "Match found, acquire succeeded: " <<
>>> +                 i->position << " == " << msg.position);
>>> +        messages.erase(i);
>>> +        return true;
>>> +    } else {
>>> +        QPID_LOG(debug, "No match: " << i->position << " != " <<
>>> msg.position);
>>>     }
>>> +
>>>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>>>     return false;
>>>  }
>>> @@ -445,19 +445,35 @@
>>>     return false;
>>>  }
>>>
>>> -namespace {
>>> -struct PositionEquals {
>>> -    SequenceNumber pos;
>>> -    PositionEquals(SequenceNumber p) : pos(p) {}
>>> -    bool operator()(const QueuedMessage& msg) const { return
>>> msg.position == pos; }
>>> -};
>>> -}// namespace
>>> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>>
>>> +    if(!messages.empty()){
>>> +        QueuedMessage compM;
>>> +        compM.position = pos;
>>> +        unsigned long diff = pos.getValue() -
>>> messages.front().position.getValue();
>>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>>> +
>>> +        Messages::iterator i =
>>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>> +        if (i->position == pos)
>>> +            return i;
>>> +    }
>>> +    return messages.end(); // no match found.
>>> +}
>>> +
>>> +
>>>  QueuedMessage Queue::find(SequenceNumber pos) const {
>>> +
>>>     Mutex::ScopedLock locker(messageLock);
>>> -    Messages::const_iterator i = std::find_if(messages.begin(),
>>> messages.end(), PositionEquals(pos));
>>> -    if (i != messages.end())
>>> -        return *i;
>>> +    if(!messages.empty()){
>>> +        QueuedMessage compM;
>>> +        compM.position = pos;
>>> +        unsigned long diff = pos.getValue() -
>>> messages.front().position.getValue();
>>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>>> +
>>> +        Messages::const_iterator i =
>>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>> +        if (i != messages.end())
>>> +            return *i;
>>> +    }
>>>     return QueuedMessage();
>>>  }
>>>
>>> @@ -642,10 +658,9 @@
>>>  }
>>>
>>>  /** function only provided for unit tests, or code not in critical
>>> message path */
>>> -uint32_t Queue::getMessageCount() const
>>> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>>  {
>>>     Mutex::ScopedLock locker(messageLock);
>>> -
>>>     uint32_t count = 0;
>>>     for ( Messages::const_iterator i = messages.begin(); i !=
>>> messages.end(); ++i ) {
>>>         //NOTE: don't need to use checkLvqReplace() here as it
>>> @@ -657,6 +672,12 @@
>>>     return count;
>>>  }
>>>
>>> +uint32_t Queue::getMessageCount() const
>>> +{
>>> +    Mutex::ScopedLock locker(messageLock);
>>> +    return messages.size();
>>> +}
>>> +
>>>  uint32_t Queue::getConsumerCount() const
>>>  {
>>>     Mutex::ScopedLock locker(consumerLock);
>>> Index: qpid/broker/QueuedMessage.h
>>> ===================================================================
>>> --- qpid/broker/QueuedMessage.h (revision 833135)
>>> +++ qpid/broker/QueuedMessage.h (working copy)
>>> @@ -38,7 +38,9 @@
>>>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
>>> framing::SequenceNumber sn) :
>>>         payload(msg), position(sn), queue(q) {}
>>>     QueuedMessage(Queue* q) : queue(q) {}
>>> +
>>>  };
>>> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage&
>>> b) { return a.position < b.position; }
>>>
>>>  }}
>>>
>>> Index: qpid/broker/Queue.h
>>> ===================================================================
>>> --- qpid/broker/Queue.h (revision 833135)
>>> +++ qpid/broker/Queue.h (working copy)
>>> @@ -148,6 +148,8 @@
>>>                     }
>>>                 }
>>>             }
>>> +
>>> +            Messages::iterator findAt(framing::SequenceNumber pos);
>>>
>>>         public:
>>>
>>> @@ -221,6 +223,7 @@
>>>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>>>
>>>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>>> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
>>> const;
>>>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>>             inline const string& getName() const { return name; }
>>>             bool isExclusiveOwner(const OwnershipToken* const o) const;
>>> Index: tests/QueueTest.cpp
>>> ===================================================================
>>> --- tests/QueueTest.cpp (revision 833135)
>>> +++ tests/QueueTest.cpp (working copy)
>>> @@ -120,9 +120,10 @@
>>>     queue->process(msg1);
>>>     sleep(2);
>>>     uint32_t compval=0;
>>> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>>     msg1->enqueueComplete();
>>>     compval=1;
>>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>  }
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> Apache Qpid - AMQP Messaging Implementation
>>> Project:      http://qpid.apache.org
>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>
>>
>>
>
>

Re: Remove liner queue functions

Posted by Carl Trieloff <cc...@redhat.com>.
Chenta,

Two things to note, Gordon has now put a patch in that corrects the 
order of the messages on requeue, i.e. they now stay in order by 
sequence number even after rollback. Thus with his patch and my seek() 
seekAt() we should now be able to resolve the acquire case quite easily 
and update the position.

Take a look and let me know if you need some help with that.

(i.e. before these two patches it would not have been possible, but now 
I believe it is)

Carl.


chenta lee wrote:
> I did the consumer sequence number wrap-around is because that when we 
> requeue the message, I cannot know which consumer consume it. 
> Therefore, I cannot not update the sequence number of consumer and the 
> messages rollback. The consequence is that when a consumer rollback 
> (requeue) messages, they can not acquire them anymore (because 
> requeue_msg.position is always larger than consumer.position ).
>
> However, my patch is not that dirty :), I didn't change the original 
> algorithm. We do not update the consumer sequence in 
> consumeNextMessage at the very beginning. From my point of view, the 
> only concern is that when a user decide to use selector in their 
> messages, they might suffer from performance issue, however, the other 
> users who do not use selector will be just fine.
>
> Chenta
>
> On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cctrieloff@redhat.com 
> <ma...@redhat.com>> wrote:
>
>
>     Thanks, the one remaining issue I know of with the selector patch
>     is that consumer
>     sequence number wrap-around I don't think works.
>
>     We need a test there and maybe change the comp operators in your
>     patch. I was looking into
>     that last week on the selector patch, I'm itching to get the patch in.
>
>     Carl.
>
>
>
>     chenta lee wrote:
>>     Hi Carl,
>>     This patch looks great, I will update the selector patch later.
>>
>>     Chenta
>>
>>     On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff
>>     <cctrieloff@redhat.com <ma...@redhat.com>> wrote:
>>
>>
>>         I created a patch which seems to work well, it targets
>>         querying the queue, count, acquire making the
>>         queue access faster for large queues (best 1 {if no requeue
>>         or acquire}, worst case binary-search) . In
>>         most cases if it faster then binary search even if requeue or
>>         selector is used.
>>
>>         It does require that the re-queue order be corrected - which
>>         should be done regardless.
>>
>>         The remaining function that could use some similar dressing
>>         would be Queue::seek()
>>
>>         Any thoughts on the patch... This patch opens the way for
>>         reasonable selector performance.
>>         Carl.
>>
>>
>>         Index: qpid/broker/Queue.cpp
>>         ===================================================================
>>         --- qpid/broker/Queue.cpp       (revision 833135)
>>         +++ qpid/broker/Queue.cpp       (working copy)
>>         @@ -243,18 +243,18 @@
>>          {
>>             Mutex::ScopedLock locker(messageLock);
>>             QPID_LOG(debug, "Attempting to acquire message at " <<
>>         position);
>>         -    for (Messages::iterator i = messages.begin(); i !=
>>         messages.end(); i++) {
>>         -        if (i->position == position) {
>>         -            message = *i;
>>         -            if (lastValueQueue) {
>>         -                clearLVQIndex(*i);
>>         -            }
>>         -            QPID_LOG(debug,
>>         -                     "Acquired message at " << i->position
>>         << " from " << name);
>>         -            messages.erase(i);
>>         -            return true;
>>         +
>>         +    Messages::iterator i = findAt(position);
>>         +    if (i != messages.end() ) {
>>         +        message = *i;
>>         +        if (lastValueQueue) {
>>         +            clearLVQIndex(*i);
>>                 }
>>         -    }
>>         +        QPID_LOG(debug,
>>         +                 "Acquired message at " << i->position << "
>>         from " << name);
>>         +        messages.erase(i);
>>         +        return true;
>>         +    }
>>             QPID_LOG(debug, "Could not acquire message at " <<
>>         position << " from " << name << "; no message at that position");
>>             return false;
>>          }
>>         @@ -262,21 +262,21 @@
>>          bool Queue::acquire(const QueuedMessage& msg) {
>>             Mutex::ScopedLock locker(messageLock);
>>             QPID_LOG(debug, "attempting to acquire " << msg.position);
>>         -    for (Messages::iterator i = messages.begin(); i !=
>>         messages.end(); i++) {
>>         -        if ((i->position == msg.position && !lastValueQueue)
>>         // note that in some cases payload not be set
>>         -            || (lastValueQueue && (i->position ==
>>         msg.position) &&
>>         -                msg.payload.get() ==
>>         checkLvqReplace(*i).payload.get()) )  {
>>         +    Messages::iterator i = findAt(msg.position);
>>         +    if ((i != messages.end() && !lastValueQueue) // note
>>         that in some cases payload not be set
>>         +        || (lastValueQueue && (i->position == msg.position) &&
>>         +            msg.payload.get() ==
>>         checkLvqReplace(*i).payload.get()) )  {
>>
>>         -            clearLVQIndex(msg);
>>         -            QPID_LOG(debug,
>>         -                     "Match found, acquire succeeded: " <<
>>         -                     i->position << " == " << msg.position);
>>         -            messages.erase(i);
>>         -            return true;
>>         -        } else {
>>         -            QPID_LOG(debug, "No match: " << i->position << "
>>         != " << msg.position);
>>         -        }
>>         +        clearLVQIndex(msg);
>>         +        QPID_LOG(debug,
>>         +                 "Match found, acquire succeeded: " <<
>>         +                 i->position << " == " << msg.position);
>>         +        messages.erase(i);
>>         +        return true;
>>         +    } else {
>>         +        QPID_LOG(debug, "No match: " << i->position << " !=
>>         " << msg.position);
>>             }
>>         +
>>             QPID_LOG(debug, "Acquire failed for " << msg.position);
>>             return false;
>>          }
>>         @@ -445,19 +445,35 @@
>>             return false;
>>          }
>>
>>         -namespace {
>>         -struct PositionEquals {
>>         -    SequenceNumber pos;
>>         -    PositionEquals(SequenceNumber p) : pos(p) {}
>>         -    bool operator()(const QueuedMessage& msg) const { return
>>         msg.position == pos; }
>>         -};
>>         -}// namespace
>>         +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>
>>         +    if(!messages.empty()){
>>         +        QueuedMessage compM;
>>         +        compM.position = pos;
>>         +        unsigned long diff = pos.getValue() -
>>         messages.front().position.getValue();
>>         +        long maxEnd = diff < messages.size()? diff :
>>         messages.size();
>>         +
>>         +        Messages::iterator i =
>>         lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>         +        if (i->position == pos)
>>         +            return i;
>>         +    }
>>         +    return messages.end(); // no match found.
>>         +}
>>         +
>>         +
>>          QueuedMessage Queue::find(SequenceNumber pos) const {
>>         +
>>             Mutex::ScopedLock locker(messageLock);
>>         -    Messages::const_iterator i =
>>         std::find_if(messages.begin(), messages.end(),
>>         PositionEquals(pos));
>>         -    if (i != messages.end())
>>         -        return *i;
>>         +    if(!messages.empty()){
>>         +        QueuedMessage compM;
>>         +        compM.position = pos;
>>         +        unsigned long diff = pos.getValue() -
>>         messages.front().position.getValue();
>>         +        long maxEnd = diff < messages.size()? diff :
>>         messages.size();
>>         +
>>         +        Messages::const_iterator i =
>>         lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>         +        if (i != messages.end())
>>         +            return *i;
>>         +    }
>>             return QueuedMessage();
>>          }
>>
>>         @@ -642,10 +658,9 @@
>>          }
>>
>>          /** function only provided for unit tests, or code not in
>>         critical message path */
>>         -uint32_t Queue::getMessageCount() const
>>         +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>          {
>>             Mutex::ScopedLock locker(messageLock);
>>         -
>>             uint32_t count = 0;
>>             for ( Messages::const_iterator i = messages.begin(); i !=
>>         messages.end(); ++i ) {
>>                 //NOTE: don't need to use checkLvqReplace() here as it
>>         @@ -657,6 +672,12 @@
>>             return count;
>>          }
>>
>>         +uint32_t Queue::getMessageCount() const
>>         +{
>>         +    Mutex::ScopedLock locker(messageLock);
>>         +    return messages.size();
>>         +}
>>         +
>>          uint32_t Queue::getConsumerCount() const
>>          {
>>             Mutex::ScopedLock locker(consumerLock);
>>         Index: qpid/broker/QueuedMessage.h
>>         ===================================================================
>>         --- qpid/broker/QueuedMessage.h (revision 833135)
>>         +++ qpid/broker/QueuedMessage.h (working copy)
>>         @@ -38,7 +38,9 @@
>>             QueuedMessage(Queue* q, boost::intrusive_ptr<Message>
>>         msg, framing::SequenceNumber sn) :
>>                 payload(msg), position(sn), queue(q) {}
>>             QueuedMessage(Queue* q) : queue(q) {}
>>         +
>>          };
>>         +    inline bool operator<(const QueuedMessage& a, const
>>         QueuedMessage& b) { return a.position < b.position; }
>>
>>          }}
>>
>>         Index: qpid/broker/Queue.h
>>         ===================================================================
>>         --- qpid/broker/Queue.h (revision 833135)
>>         +++ qpid/broker/Queue.h (working copy)
>>         @@ -148,6 +148,8 @@
>>                             }
>>                         }
>>                     }
>>         +
>>         +            Messages::iterator
>>         findAt(framing::SequenceNumber pos);
>>
>>                 public:
>>
>>         @@ -221,6 +223,7 @@
>>                     uint32_t move(const Queue::shared_ptr destq,
>>         uint32_t qty);
>>
>>                     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>>         +            QPID_BROKER_EXTERN uint32_t
>>         getEnqueueCompleteMessageCount() const;
>>                     QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>                     inline const string& getName() const { return name; }
>>                     bool isExclusiveOwner(const OwnershipToken* const
>>         o) const;
>>         Index: tests/QueueTest.cpp
>>         ===================================================================
>>         --- tests/QueueTest.cpp (revision 833135)
>>         +++ tests/QueueTest.cpp (working copy)
>>         @@ -120,9 +120,10 @@
>>             queue->process(msg1);
>>             sleep(2);
>>             uint32_t compval=0;
>>         -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>         +    BOOST_CHECK_EQUAL(compval,
>>         queue->getEnqueueCompleteMessageCount());
>>             msg1->enqueueComplete();
>>             compval=1;
>>         +    BOOST_CHECK_EQUAL(compval,
>>         queue->getEnqueueCompleteMessageCount());
>>             BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>          }
>>
>>
>>
>>         ---------------------------------------------------------------------
>>         Apache Qpid - AMQP Messaging Implementation
>>         Project:      http://qpid.apache.org
>>         Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>         <ma...@qpid.apache.org>
>>
>>
>
>


Re: Remove liner queue functions

Posted by chenta lee <ch...@gmail.com>.
I did the consumer sequence number wrap-around is because that when we
requeue the message, I cannot know which consumer consume it. Therefore, I
cannot not update the sequence number of consumer and the messages rollback.
The consequence is that when a consumer rollback (requeue) messages, they
can not acquire them anymore (because requeue_msg.position is always larger
than consumer.position ).

However, my patch is not that dirty :), I didn't change the original
algorithm. We do not update the consumer sequence in consumeNextMessage at
the very beginning. From my point of view, the only concern is that when a
user decide to use selector in their messages, they might suffer from
performance issue, however, the other users who do not use selector will be
just fine.

Chenta

On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cc...@redhat.com>wrote:

>
> Thanks, the one remaining issue I know of with the selector patch is that
> consumer
> sequence number wrap-around I don't think works.
>
> We need a test there and maybe change the comp operators in your patch. I
> was looking into
> that last week on the selector patch, I'm itching to get the patch in.
>
> Carl.
>
>
>
> chenta lee wrote:
>
> Hi Carl,
> This patch looks great, I will update the selector patch later.
>
>  Chenta
>
> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cc...@redhat.com>wrote:
>
>>
>> I created a patch which seems to work well, it targets querying the queue,
>> count, acquire making the
>> queue access faster for large queues (best 1 {if no requeue or acquire},
>> worst case binary-search) . In
>> most cases if it faster then binary search even if requeue or selector is
>> used.
>>
>> It does require that the re-queue order be corrected - which should be
>> done regardless.
>>
>> The remaining function that could use some similar dressing would be
>> Queue::seek()
>>
>> Any thoughts on the patch... This patch opens the way for reasonable
>> selector performance.
>> Carl.
>>
>>
>> Index: qpid/broker/Queue.cpp
>> ===================================================================
>> --- qpid/broker/Queue.cpp       (revision 833135)
>> +++ qpid/broker/Queue.cpp       (working copy)
>> @@ -243,18 +243,18 @@
>>  {
>>     Mutex::ScopedLock locker(messageLock);
>>     QPID_LOG(debug, "Attempting to acquire message at " << position);
>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>> i++) {
>> -        if (i->position == position) {
>> -            message = *i;
>> -            if (lastValueQueue) {
>> -                clearLVQIndex(*i);
>> -            }
>> -            QPID_LOG(debug,
>> -                     "Acquired message at " << i->position << " from " <<
>> name);
>> -            messages.erase(i);
>> -            return true;
>> +
>> +    Messages::iterator i = findAt(position);
>> +    if (i != messages.end() ) {
>> +        message = *i;
>> +        if (lastValueQueue) {
>> +            clearLVQIndex(*i);
>>         }
>> -    }
>> +        QPID_LOG(debug,
>> +                 "Acquired message at " << i->position << " from " <<
>> name);
>> +        messages.erase(i);
>> +        return true;
>> +    }
>>     QPID_LOG(debug, "Could not acquire message at " << position << " from
>> " << name << "; no message at that position");
>>     return false;
>>  }
>> @@ -262,21 +262,21 @@
>>  bool Queue::acquire(const QueuedMessage& msg) {
>>     Mutex::ScopedLock locker(messageLock);
>>     QPID_LOG(debug, "attempting to acquire " << msg.position);
>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>> i++) {
>> -        if ((i->position == msg.position && !lastValueQueue) // note that
>> in some cases payload not be set
>> -            || (lastValueQueue && (i->position == msg.position) &&
>> -                msg.payload.get() == checkLvqReplace(*i).payload.get()) )
>>  {
>> +    Messages::iterator i = findAt(msg.position);
>> +    if ((i != messages.end() && !lastValueQueue) // note that in some
>> cases payload not be set
>> +        || (lastValueQueue && (i->position == msg.position) &&
>> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>>
>> -            clearLVQIndex(msg);
>> -            QPID_LOG(debug,
>> -                     "Match found, acquire succeeded: " <<
>> -                     i->position << " == " << msg.position);
>> -            messages.erase(i);
>> -            return true;
>> -        } else {
>> -            QPID_LOG(debug, "No match: " << i->position << " != " <<
>> msg.position);
>> -        }
>> +        clearLVQIndex(msg);
>> +        QPID_LOG(debug,
>> +                 "Match found, acquire succeeded: " <<
>> +                 i->position << " == " << msg.position);
>> +        messages.erase(i);
>> +        return true;
>> +    } else {
>> +        QPID_LOG(debug, "No match: " << i->position << " != " <<
>> msg.position);
>>     }
>> +
>>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>>     return false;
>>  }
>> @@ -445,19 +445,35 @@
>>     return false;
>>  }
>>
>> -namespace {
>> -struct PositionEquals {
>> -    SequenceNumber pos;
>> -    PositionEquals(SequenceNumber p) : pos(p) {}
>> -    bool operator()(const QueuedMessage& msg) const { return msg.position
>> == pos; }
>> -};
>> -}// namespace
>> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>
>> +    if(!messages.empty()){
>> +        QueuedMessage compM;
>> +        compM.position = pos;
>> +        unsigned long diff = pos.getValue() -
>> messages.front().position.getValue();
>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>> +
>> +        Messages::iterator i =
>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>> +        if (i->position == pos)
>> +            return i;
>> +    }
>> +    return messages.end(); // no match found.
>> +}
>> +
>> +
>>  QueuedMessage Queue::find(SequenceNumber pos) const {
>> +
>>     Mutex::ScopedLock locker(messageLock);
>> -    Messages::const_iterator i = std::find_if(messages.begin(),
>> messages.end(), PositionEquals(pos));
>> -    if (i != messages.end())
>> -        return *i;
>> +    if(!messages.empty()){
>> +        QueuedMessage compM;
>> +        compM.position = pos;
>> +        unsigned long diff = pos.getValue() -
>> messages.front().position.getValue();
>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>> +
>> +        Messages::const_iterator i =
>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>> +        if (i != messages.end())
>> +            return *i;
>> +    }
>>     return QueuedMessage();
>>  }
>>
>> @@ -642,10 +658,9 @@
>>  }
>>
>>  /** function only provided for unit tests, or code not in critical
>> message path */
>> -uint32_t Queue::getMessageCount() const
>> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>  {
>>     Mutex::ScopedLock locker(messageLock);
>> -
>>     uint32_t count = 0;
>>     for ( Messages::const_iterator i = messages.begin(); i !=
>> messages.end(); ++i ) {
>>         //NOTE: don't need to use checkLvqReplace() here as it
>> @@ -657,6 +672,12 @@
>>     return count;
>>  }
>>
>> +uint32_t Queue::getMessageCount() const
>> +{
>> +    Mutex::ScopedLock locker(messageLock);
>> +    return messages.size();
>> +}
>> +
>>  uint32_t Queue::getConsumerCount() const
>>  {
>>     Mutex::ScopedLock locker(consumerLock);
>> Index: qpid/broker/QueuedMessage.h
>> ===================================================================
>> --- qpid/broker/QueuedMessage.h (revision 833135)
>> +++ qpid/broker/QueuedMessage.h (working copy)
>> @@ -38,7 +38,9 @@
>>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
>> framing::SequenceNumber sn) :
>>         payload(msg), position(sn), queue(q) {}
>>     QueuedMessage(Queue* q) : queue(q) {}
>> +
>>  };
>> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
>> { return a.position < b.position; }
>>
>>  }}
>>
>> Index: qpid/broker/Queue.h
>> ===================================================================
>> --- qpid/broker/Queue.h (revision 833135)
>> +++ qpid/broker/Queue.h (working copy)
>> @@ -148,6 +148,8 @@
>>                     }
>>                 }
>>             }
>> +
>> +            Messages::iterator findAt(framing::SequenceNumber pos);
>>
>>         public:
>>
>> @@ -221,6 +223,7 @@
>>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>>
>>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
>> const;
>>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>             inline const string& getName() const { return name; }
>>             bool isExclusiveOwner(const OwnershipToken* const o) const;
>> Index: tests/QueueTest.cpp
>> ===================================================================
>> --- tests/QueueTest.cpp (revision 833135)
>> +++ tests/QueueTest.cpp (working copy)
>> @@ -120,9 +120,10 @@
>>     queue->process(msg1);
>>     sleep(2);
>>     uint32_t compval=0;
>> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>     msg1->enqueueComplete();
>>     compval=1;
>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>  }
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>
>
>

Re: Remove liner queue functions

Posted by Carl Trieloff <cc...@redhat.com>.
Thanks, the one remaining issue I know of with the selector patch is 
that consumer
sequence number wrap-around I don't think works.

We need a test there and maybe change the comp operators in your patch. 
I was looking into
that last week on the selector patch, I'm itching to get the patch in.

Carl.


chenta lee wrote:
> Hi Carl,
> This patch looks great, I will update the selector patch later.
>
> Chenta
>
> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cctrieloff@redhat.com 
> <ma...@redhat.com>> wrote:
>
>
>     I created a patch which seems to work well, it targets querying
>     the queue, count, acquire making the
>     queue access faster for large queues (best 1 {if no requeue or
>     acquire}, worst case binary-search) . In
>     most cases if it faster then binary search even if requeue or
>     selector is used.
>
>     It does require that the re-queue order be corrected - which
>     should be done regardless.
>
>     The remaining function that could use some similar dressing would
>     be Queue::seek()
>
>     Any thoughts on the patch... This patch opens the way for
>     reasonable selector performance.
>     Carl.
>
>
>     Index: qpid/broker/Queue.cpp
>     ===================================================================
>     --- qpid/broker/Queue.cpp       (revision 833135)
>     +++ qpid/broker/Queue.cpp       (working copy)
>     @@ -243,18 +243,18 @@
>      {
>         Mutex::ScopedLock locker(messageLock);
>         QPID_LOG(debug, "Attempting to acquire message at " << position);
>     -    for (Messages::iterator i = messages.begin(); i !=
>     messages.end(); i++) {
>     -        if (i->position == position) {
>     -            message = *i;
>     -            if (lastValueQueue) {
>     -                clearLVQIndex(*i);
>     -            }
>     -            QPID_LOG(debug,
>     -                     "Acquired message at " << i->position << "
>     from " << name);
>     -            messages.erase(i);
>     -            return true;
>     +
>     +    Messages::iterator i = findAt(position);
>     +    if (i != messages.end() ) {
>     +        message = *i;
>     +        if (lastValueQueue) {
>     +            clearLVQIndex(*i);
>             }
>     -    }
>     +        QPID_LOG(debug,
>     +                 "Acquired message at " << i->position << " from
>     " << name);
>     +        messages.erase(i);
>     +        return true;
>     +    }
>         QPID_LOG(debug, "Could not acquire message at " << position <<
>     " from " << name << "; no message at that position");
>         return false;
>      }
>     @@ -262,21 +262,21 @@
>      bool Queue::acquire(const QueuedMessage& msg) {
>         Mutex::ScopedLock locker(messageLock);
>         QPID_LOG(debug, "attempting to acquire " << msg.position);
>     -    for (Messages::iterator i = messages.begin(); i !=
>     messages.end(); i++) {
>     -        if ((i->position == msg.position && !lastValueQueue) //
>     note that in some cases payload not be set
>     -            || (lastValueQueue && (i->position == msg.position) &&
>     -                msg.payload.get() ==
>     checkLvqReplace(*i).payload.get()) )  {
>     +    Messages::iterator i = findAt(msg.position);
>     +    if ((i != messages.end() && !lastValueQueue) // note that in
>     some cases payload not be set
>     +        || (lastValueQueue && (i->position == msg.position) &&
>     +            msg.payload.get() ==
>     checkLvqReplace(*i).payload.get()) )  {
>
>     -            clearLVQIndex(msg);
>     -            QPID_LOG(debug,
>     -                     "Match found, acquire succeeded: " <<
>     -                     i->position << " == " << msg.position);
>     -            messages.erase(i);
>     -            return true;
>     -        } else {
>     -            QPID_LOG(debug, "No match: " << i->position << " != "
>     << msg.position);
>     -        }
>     +        clearLVQIndex(msg);
>     +        QPID_LOG(debug,
>     +                 "Match found, acquire succeeded: " <<
>     +                 i->position << " == " << msg.position);
>     +        messages.erase(i);
>     +        return true;
>     +    } else {
>     +        QPID_LOG(debug, "No match: " << i->position << " != " <<
>     msg.position);
>         }
>     +
>         QPID_LOG(debug, "Acquire failed for " << msg.position);
>         return false;
>      }
>     @@ -445,19 +445,35 @@
>         return false;
>      }
>
>     -namespace {
>     -struct PositionEquals {
>     -    SequenceNumber pos;
>     -    PositionEquals(SequenceNumber p) : pos(p) {}
>     -    bool operator()(const QueuedMessage& msg) const { return
>     msg.position == pos; }
>     -};
>     -}// namespace
>     +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>
>     +    if(!messages.empty()){
>     +        QueuedMessage compM;
>     +        compM.position = pos;
>     +        unsigned long diff = pos.getValue() -
>     messages.front().position.getValue();
>     +        long maxEnd = diff < messages.size()? diff : messages.size();
>     +
>     +        Messages::iterator i =
>     lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>     +        if (i->position == pos)
>     +            return i;
>     +    }
>     +    return messages.end(); // no match found.
>     +}
>     +
>     +
>      QueuedMessage Queue::find(SequenceNumber pos) const {
>     +
>         Mutex::ScopedLock locker(messageLock);
>     -    Messages::const_iterator i = std::find_if(messages.begin(),
>     messages.end(), PositionEquals(pos));
>     -    if (i != messages.end())
>     -        return *i;
>     +    if(!messages.empty()){
>     +        QueuedMessage compM;
>     +        compM.position = pos;
>     +        unsigned long diff = pos.getValue() -
>     messages.front().position.getValue();
>     +        long maxEnd = diff < messages.size()? diff : messages.size();
>     +
>     +        Messages::const_iterator i =
>     lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>     +        if (i != messages.end())
>     +            return *i;
>     +    }
>         return QueuedMessage();
>      }
>
>     @@ -642,10 +658,9 @@
>      }
>
>      /** function only provided for unit tests, or code not in
>     critical message path */
>     -uint32_t Queue::getMessageCount() const
>     +uint32_t Queue::getEnqueueCompleteMessageCount() const
>      {
>         Mutex::ScopedLock locker(messageLock);
>     -
>         uint32_t count = 0;
>         for ( Messages::const_iterator i = messages.begin(); i !=
>     messages.end(); ++i ) {
>             //NOTE: don't need to use checkLvqReplace() here as it
>     @@ -657,6 +672,12 @@
>         return count;
>      }
>
>     +uint32_t Queue::getMessageCount() const
>     +{
>     +    Mutex::ScopedLock locker(messageLock);
>     +    return messages.size();
>     +}
>     +
>      uint32_t Queue::getConsumerCount() const
>      {
>         Mutex::ScopedLock locker(consumerLock);
>     Index: qpid/broker/QueuedMessage.h
>     ===================================================================
>     --- qpid/broker/QueuedMessage.h (revision 833135)
>     +++ qpid/broker/QueuedMessage.h (working copy)
>     @@ -38,7 +38,9 @@
>         QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
>     framing::SequenceNumber sn) :
>             payload(msg), position(sn), queue(q) {}
>         QueuedMessage(Queue* q) : queue(q) {}
>     +
>      };
>     +    inline bool operator<(const QueuedMessage& a, const
>     QueuedMessage& b) { return a.position < b.position; }
>
>      }}
>
>     Index: qpid/broker/Queue.h
>     ===================================================================
>     --- qpid/broker/Queue.h (revision 833135)
>     +++ qpid/broker/Queue.h (working copy)
>     @@ -148,6 +148,8 @@
>                         }
>                     }
>                 }
>     +
>     +            Messages::iterator findAt(framing::SequenceNumber pos);
>
>             public:
>
>     @@ -221,6 +223,7 @@
>                 uint32_t move(const Queue::shared_ptr destq, uint32_t
>     qty);
>
>                 QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>     +            QPID_BROKER_EXTERN uint32_t
>     getEnqueueCompleteMessageCount() const;
>                 QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>                 inline const string& getName() const { return name; }
>                 bool isExclusiveOwner(const OwnershipToken* const o)
>     const;
>     Index: tests/QueueTest.cpp
>     ===================================================================
>     --- tests/QueueTest.cpp (revision 833135)
>     +++ tests/QueueTest.cpp (working copy)
>     @@ -120,9 +120,10 @@
>         queue->process(msg1);
>         sleep(2);
>         uint32_t compval=0;
>     -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>     +    BOOST_CHECK_EQUAL(compval,
>     queue->getEnqueueCompleteMessageCount());
>         msg1->enqueueComplete();
>         compval=1;
>     +    BOOST_CHECK_EQUAL(compval,
>     queue->getEnqueueCompleteMessageCount());
>         BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>      }
>
>
>
>     ---------------------------------------------------------------------
>     Apache Qpid - AMQP Messaging Implementation
>     Project:      http://qpid.apache.org
>     Use/Interact: mailto:dev-subscribe@qpid.apache.org
>     <ma...@qpid.apache.org>
>
>


Re: Remove liner queue functions

Posted by chenta lee <ch...@gmail.com>.
Hi Carl,
This patch looks great, I will update the selector patch later.

Chenta

On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cc...@redhat.com> wrote:

>
> I created a patch which seems to work well, it targets querying the queue,
> count, acquire making the
> queue access faster for large queues (best 1 {if no requeue or acquire},
> worst case binary-search) . In
> most cases if it faster then binary search even if requeue or selector is
> used.
>
> It does require that the re-queue order be corrected - which should be done
> regardless.
>
> The remaining function that could use some similar dressing would be
> Queue::seek()
>
> Any thoughts on the patch... This patch opens the way for reasonable
> selector performance.
> Carl.
>
>
> Index: qpid/broker/Queue.cpp
> ===================================================================
> --- qpid/broker/Queue.cpp       (revision 833135)
> +++ qpid/broker/Queue.cpp       (working copy)
> @@ -243,18 +243,18 @@
>  {
>     Mutex::ScopedLock locker(messageLock);
>     QPID_LOG(debug, "Attempting to acquire message at " << position);
> -    for (Messages::iterator i = messages.begin(); i != messages.end();
> i++) {
> -        if (i->position == position) {
> -            message = *i;
> -            if (lastValueQueue) {
> -                clearLVQIndex(*i);
> -            }
> -            QPID_LOG(debug,
> -                     "Acquired message at " << i->position << " from " <<
> name);
> -            messages.erase(i);
> -            return true;
> +
> +    Messages::iterator i = findAt(position);
> +    if (i != messages.end() ) {
> +        message = *i;
> +        if (lastValueQueue) {
> +            clearLVQIndex(*i);
>         }
> -    }
> +        QPID_LOG(debug,
> +                 "Acquired message at " << i->position << " from " <<
> name);
> +        messages.erase(i);
> +        return true;
> +    }
>     QPID_LOG(debug, "Could not acquire message at " << position << " from "
> << name << "; no message at that position");
>     return false;
>  }
> @@ -262,21 +262,21 @@
>  bool Queue::acquire(const QueuedMessage& msg) {
>     Mutex::ScopedLock locker(messageLock);
>     QPID_LOG(debug, "attempting to acquire " << msg.position);
> -    for (Messages::iterator i = messages.begin(); i != messages.end();
> i++) {
> -        if ((i->position == msg.position && !lastValueQueue) // note that
> in some cases payload not be set
> -            || (lastValueQueue && (i->position == msg.position) &&
> -                msg.payload.get() == checkLvqReplace(*i).payload.get()) )
>  {
> +    Messages::iterator i = findAt(msg.position);
> +    if ((i != messages.end() && !lastValueQueue) // note that in some
> cases payload not be set
> +        || (lastValueQueue && (i->position == msg.position) &&
> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>
> -            clearLVQIndex(msg);
> -            QPID_LOG(debug,
> -                     "Match found, acquire succeeded: " <<
> -                     i->position << " == " << msg.position);
> -            messages.erase(i);
> -            return true;
> -        } else {
> -            QPID_LOG(debug, "No match: " << i->position << " != " <<
> msg.position);
> -        }
> +        clearLVQIndex(msg);
> +        QPID_LOG(debug,
> +                 "Match found, acquire succeeded: " <<
> +                 i->position << " == " << msg.position);
> +        messages.erase(i);
> +        return true;
> +    } else {
> +        QPID_LOG(debug, "No match: " << i->position << " != " <<
> msg.position);
>     }
> +
>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>     return false;
>  }
> @@ -445,19 +445,35 @@
>     return false;
>  }
>
> -namespace {
> -struct PositionEquals {
> -    SequenceNumber pos;
> -    PositionEquals(SequenceNumber p) : pos(p) {}
> -    bool operator()(const QueuedMessage& msg) const { return msg.position
> == pos; }
> -};
> -}// namespace
> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>
> +    if(!messages.empty()){
> +        QueuedMessage compM;
> +        compM.position = pos;
> +        unsigned long diff = pos.getValue() -
> messages.front().position.getValue();
> +        long maxEnd = diff < messages.size()? diff : messages.size();
> +
> +        Messages::iterator i =
> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
> +        if (i->position == pos)
> +            return i;
> +    }
> +    return messages.end(); // no match found.
> +}
> +
> +
>  QueuedMessage Queue::find(SequenceNumber pos) const {
> +
>     Mutex::ScopedLock locker(messageLock);
> -    Messages::const_iterator i = std::find_if(messages.begin(),
> messages.end(), PositionEquals(pos));
> -    if (i != messages.end())
> -        return *i;
> +    if(!messages.empty()){
> +        QueuedMessage compM;
> +        compM.position = pos;
> +        unsigned long diff = pos.getValue() -
> messages.front().position.getValue();
> +        long maxEnd = diff < messages.size()? diff : messages.size();
> +
> +        Messages::const_iterator i =
> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
> +        if (i != messages.end())
> +            return *i;
> +    }
>     return QueuedMessage();
>  }
>
> @@ -642,10 +658,9 @@
>  }
>
>  /** function only provided for unit tests, or code not in critical message
> path */
> -uint32_t Queue::getMessageCount() const
> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>  {
>     Mutex::ScopedLock locker(messageLock);
> -
>     uint32_t count = 0;
>     for ( Messages::const_iterator i = messages.begin(); i !=
> messages.end(); ++i ) {
>         //NOTE: don't need to use checkLvqReplace() here as it
> @@ -657,6 +672,12 @@
>     return count;
>  }
>
> +uint32_t Queue::getMessageCount() const
> +{
> +    Mutex::ScopedLock locker(messageLock);
> +    return messages.size();
> +}
> +
>  uint32_t Queue::getConsumerCount() const
>  {
>     Mutex::ScopedLock locker(consumerLock);
> Index: qpid/broker/QueuedMessage.h
> ===================================================================
> --- qpid/broker/QueuedMessage.h (revision 833135)
> +++ qpid/broker/QueuedMessage.h (working copy)
> @@ -38,7 +38,9 @@
>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
> framing::SequenceNumber sn) :
>         payload(msg), position(sn), queue(q) {}
>     QueuedMessage(Queue* q) : queue(q) {}
> +
>  };
> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
> { return a.position < b.position; }
>
>  }}
>
> Index: qpid/broker/Queue.h
> ===================================================================
> --- qpid/broker/Queue.h (revision 833135)
> +++ qpid/broker/Queue.h (working copy)
> @@ -148,6 +148,8 @@
>                     }
>                 }
>             }
> +
> +            Messages::iterator findAt(framing::SequenceNumber pos);
>
>         public:
>
> @@ -221,6 +223,7 @@
>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>
>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
> const;
>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>             inline const string& getName() const { return name; }
>             bool isExclusiveOwner(const OwnershipToken* const o) const;
> Index: tests/QueueTest.cpp
> ===================================================================
> --- tests/QueueTest.cpp (revision 833135)
> +++ tests/QueueTest.cpp (working copy)
> @@ -120,9 +120,10 @@
>     queue->process(msg1);
>     sleep(2);
>     uint32_t compval=0;
> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>     msg1->enqueueComplete();
>     compval=1;
> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>  }
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

Re: Remove liner queue functions

Posted by Gordon Sim <gs...@redhat.com>.
On 11/06/2009 07:30 PM, Carl Trieloff wrote:
> +    if(!messages.empty()){
> +        QueuedMessage compM;
> +        compM.position = pos;
> +        unsigned long diff = pos.getValue() - messages.front().position.getValue();
> +        long maxEnd = diff<  messages.size()? diff : messages.size();
> +
> +        Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
> +        if (i->position == pos)
> +            return i;

That looks unsafe to me; if lower_bound() returns messages.end(), 
i->position will not be valid.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org