You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Gordon Sim <gs...@redhat.com> on 2009/09/14 14:46:35 UTC

Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

On 09/11/2009 02:33 PM, kpvdr@apache.org wrote:
> Author: kpvdr
> Date: Fri Sep 11 13:33:42 2009
> New Revision: 813825
>
> URL: http://svn.apache.org/viewvc?rev=813825&view=rev
> Log:
> Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.

I'm not very keen on some of these changes. While the original code is 
already unpleasant, this allows the bad smells to leak further into the 
codebase.

On a separate point, though its great to get whitespace fixed up, when a 
commit like this one is contains >60% whitespace changes mixed up with 
various real semantic changes it becomes much harder to review.

However, back to the more important issue, the actual changes and the 
specific things I dislike:

* setting the store on a message in Queue::push()

This means that if a message previously pushed on to a durable queue 
then gets pushed on to a transient queue, the store pointer will be set 
to null.

Though that case could be fixed by a simple check this seems like the 
wrong place to be associating the message with a store. In fact the same 
line has also been added to the MessageBuilder (which I think is a 
reasonable change), so its not clear why its called again here.

(And though checking whether all queues are persistent before routing 
may mean that in the common cases overwriting the store causes no harm, 
what about the case where a queue is deleted and its messages are routed 
through an alternate exchange?).

* iterating through all matching bindings to check that the queues have 
a persistence id and taking that to mean they are persistent and a store 
is loaded before iterating over the same list again to route each message

I find that a little ugly and it leaks aspects of the rather unpleasant 
flow-to-disk 'feature' to the exchange class.

* using qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr as 
the type of the parameter to the new doRoute method on Exchange.

It would be far better to just use a const reference to a 
Binding::vector. The CopyOnWriteArray logic is irrelevant to the method 
in question, all that is being used is that typedef which just makes 
someone reading the code work harder to figure out it is irrelevant.

That is particularly the case in the Topic exchange for which the new 
typdef qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr is added to 
CopyOnWriteArray for no apparent reason and used in this one place 
(again when none of the CopyOnWriteArray logic is being used).

* perpetuating the unclear division of role between PersistableMessage 
and Message itself; the logic for releasing and controlling the release 
seems split between these two in what seems like an arbitrary manner.

Note also that with this change transactionally published messages will 
never be released from memory as the call to do so (if required) in 
SemanticState will happen before any request has been made to release 
the content.

The code for this aspect clearly has a number of problems which we need 
to fix - creating Jiras for these would I think be valuable. I think we 
need to try hard to get the cleanest possible set of changes though, and 
prevent the already incoherent design from deteriorating even further.

>
> Added:
>      qpid/trunk/qpid/cpp/src/qpid/broker/MessageReleaseManager.h
> Modified:
>      qpid/trunk/qpid/cpp/src/Makefile.am
>      qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
>      qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
>      qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
>      qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
>      qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
>      qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
>      qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
>      qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
>      qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
>      qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
>
> Modified: qpid/trunk/qpid/cpp/src/Makefile.am
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/Makefile.am (original)
> +++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 11 13:33:42 2009
> @@ -536,6 +536,7 @@
>     qpid/broker/MessageAdapter.h \
>     qpid/broker/MessageBuilder.cpp \
>     qpid/broker/MessageBuilder.h \
> +  qpid/broker/MessageReleaseMgr.h \
>     qpid/broker/MessageStore.h \
>     qpid/broker/MessageStoreModule.cpp \
>     qpid/broker/MessageStoreModule.h \
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -28,7 +28,7 @@
>   using qpid::management::Manageable;
>   namespace _qmf = qmf::org::apache::qpid::broker;
>
> -namespace
> +namespace
>   {
>   const std::string qpidFedOp("qpid.fed.op");
>   const std::string qpidFedTags("qpid.fed.tags");
> @@ -98,7 +98,7 @@
>            */
>           std::vector<std::string>  keys2prop;
>           {
> -            Mutex::ScopedLock l(lock);
> +            Mutex::ScopedLock l(lock);
>               for (Bindings::iterator iter = bindings.begin();
>                    iter != bindings.end(); iter++) {
>                   const BoundKey&  bk = iter->second;
> @@ -150,34 +150,7 @@
>           Mutex::ScopedLock l(lock);
>           p = bindings[routingKey].queues.snapshot();
>       }
> -    int count(0);
> -
> -    if (p) {
> -        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
> -            msg.deliverTo((*i)->queue);
> -            if ((*i)->mgmtBinding != 0)
> -                (*i)->mgmtBinding->inc_msgMatched();
> -        }
> -    }
> -
> -    if(!count){
> -        QPID_LOG(info, "DirectExchange "<<  getName()<<  " could not route message with key "<<  routingKey
> -<<  "; no matching binding found");
> -        if (mgmtExchange != 0) {
> -            mgmtExchange->inc_msgDrops();
> -            mgmtExchange->inc_byteDrops(msg.contentSize());
> -        }
> -    } else {
> -        if (mgmtExchange != 0) {
> -            mgmtExchange->inc_msgRoutes(count);
> -            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
> -        }
> -    }
> -
> -    if (mgmtExchange != 0) {
> -        mgmtExchange->inc_msgReceives();
> -        mgmtExchange->inc_byteReceives(msg.contentSize());
> -    }
> +    doRoute(msg, p);
>   }
>
>
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Sep 11 13:33:42 2009
> @@ -76,6 +76,49 @@
>       }
>   }
>
> +void Exchange::blockContentReleaseCheck(Deliverable&  msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
> +{
> +    bool allQueuesPersistent = true;
> +    for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent&&  i!=p->end(); i++) {
> +        allQueuesPersistent = (*i)->queue->getPersistenceId()>  0;
> +    }
> +    if (msg.getMessage().contentSize()&&  (!allQueuesPersistent || (p->size()>  1&&  !msg.getMessage().isPersistent()))) {
> +        msg.getMessage().blockRelease();
> +    }
> +}
> +
> +void Exchange::doRoute(Deliverable&  msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
> +{
> +    int count = 0;
> +
> +    if (p.get()) {
> +        blockContentReleaseCheck(msg, p);
> +
> +        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
> +            msg.deliverTo((*i)->queue);
> +            if ((*i)->mgmtBinding != 0)
> +                (*i)->mgmtBinding->inc_msgMatched();
> +        }
> +    }
> +
> +    if (mgmtExchange != 0)
> +    {
> +        mgmtExchange->inc_msgReceives  ();
> +        mgmtExchange->inc_byteReceives (msg.contentSize ());
> +        if (count == 0)
> +        {
> +            //QPID_LOG(warning, "Exchange "<<  getName()<<  " could not route message; no matching binding found");
> +            mgmtExchange->inc_msgDrops  ();
> +            mgmtExchange->inc_byteDrops (msg.contentSize ());
> +        }
> +        else
> +        {
> +            mgmtExchange->inc_msgRoutes  (count);
> +            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
> +        }
> +    }
> +}
> +
>   void Exchange::routeIVE(){
>       if (ive&&  lastMsg.get()){
>           DeliverableMessage dmsg(lastMsg);
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -29,6 +29,7 @@
>   #include "qpid/broker/MessageStore.h"
>   #include "qpid/broker/PersistableExchange.h"
>   #include "qpid/framing/FieldTable.h"
> +#include "qpid/sys/CopyOnWriteArray.h"
>   #include "qpid/sys/Mutex.h"
>   #include "qpid/management/Manageable.h"
>   #include "qmf/org/apache/qpid/broker/Exchange.h"
> @@ -78,12 +79,14 @@
>       private:
>           Exchange* parent;
>       };
> -
> +
> +    void blockContentReleaseCheck(Deliverable&  msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b);
> +    void doRoute(Deliverable&  msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b);
>       void routeIVE();
> -
> +
>
>       struct MatchQueue {
> -        const Queue::shared_ptr queue;
> +        const Queue::shared_ptr queue;
>           MatchQueue(Queue::shared_ptr q);
>           bool operator()(Exchange::Binding::shared_ptr b);
>       };
> @@ -143,7 +146,7 @@
>       virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
>       QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
>       virtual void route(Deliverable&  msg, const std::string&  routingKey, const qpid::framing::FieldTable* args) = 0;
> -
> +
>       //PersistableExchange:
>       QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
>       uint64_t getPersistenceId() const { return persistenceId; }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -26,7 +26,7 @@
>   using namespace qpid::sys;
>   namespace _qmf = qmf::org::apache::qpid::broker;
>
> -namespace
> +namespace
>   {
>   const std::string qpidFedOp("qpid.fed.op");
>   const std::string qpidFedTags("qpid.fed.tags");
> @@ -106,34 +106,11 @@
>       return true;
>   }
>
> -void FanOutExchange::route(Deliverable&  msg, const string&  /*routingKey*/, const FieldTable* /*args*/){
> +void FanOutExchange::route(Deliverable&  msg, const string&  /*routingKey*/, const FieldTable* /*args*/)
> +{
>       PreRoute pr(msg, this);
> -    uint32_t count(0);
> -
>       BindingsArray::ConstPtr p = bindings.snapshot();
> -    if (p.get()){
> -        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
> -            msg.deliverTo((*i)->queue);
> -            if ((*i)->mgmtBinding != 0)
> -                (*i)->mgmtBinding->inc_msgMatched ();
> -        }
> -    }
> -
> -    if (mgmtExchange != 0)
> -    {
> -        mgmtExchange->inc_msgReceives  ();
> -        mgmtExchange->inc_byteReceives (msg.contentSize ());
> -        if (count == 0)
> -        {
> -            mgmtExchange->inc_msgDrops  ();
> -            mgmtExchange->inc_byteDrops (msg.contentSize ());
> -        }
> -        else
> -        {
> -            mgmtExchange->inc_msgRoutes  (count);
> -            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
> -        }
> -    }
> +    doRoute(msg, p);
>   }
>
>   bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -118,31 +118,17 @@
>
>       PreRoute pr(msg, this);
>
> -    uint32_t count(0);
> -
>       Bindings::ConstPtr p = bindings.snapshot();
> -    if (p.get()){
> +    Bindings::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding>  >);
> +    if (p.get())
> +    {
>           for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
>               if (match((*i)->args, *args)) {
> -                msg.deliverTo((*i)->queue);
> -                count++;
> -                if ((*i)->mgmtBinding != 0)
> -                    (*i)->mgmtBinding->inc_msgMatched();
> +                b->push_back(*i);
>               }
>           }
>       }
> -
> -    if (mgmtExchange != 0) {
> -        mgmtExchange->inc_msgReceives();
> -        mgmtExchange->inc_byteReceives(msg.contentSize());
> -        if (count == 0) {
> -            mgmtExchange->inc_msgDrops();
> -            mgmtExchange->inc_byteDrops(msg.contentSize());
> -        } else {
> -            mgmtExchange->inc_msgRoutes(count);
> -            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
> -        }
> -    }
> +    doRoute(msg, b);
>   }
>
>
> @@ -163,7 +149,7 @@
>
>   const std::string HeadersExchange::typeName("headers");
>
> -namespace
> +namespace
>   {
>
>       bool match_values(const FieldValue&  bind, const FieldValue&  msg) {
> @@ -181,7 +167,7 @@
>                i != bind.end();
>                ++i)
>           {
> -            if (i->first != x_match)
> +            if (i->first != x_match)
>               {
>                   Map::const_iterator j = msg.find(i->first);
>                   if (j == msg.end()) return false;
> @@ -194,7 +180,7 @@
>                i != bind.end();
>                ++i)
>           {
> -            if (i->first != x_match)
> +            if (i->first != x_match)
>               {
>                   Map::const_iterator j = msg.find(i->first);
>                   if (j != msg.end()) {
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -22,6 +22,7 @@
>   #include "qpid/broker/Message.h"
>   #include "qpid/broker/ExchangeRegistry.h"
>   #include "qpid/broker/ExpiryPolicy.h"
> +#include "qpid/broker/NullMessageStore.h"
>   #include "qpid/StringUtils.h"
>   #include "qpid/framing/frame_functors.h"
>   #include "qpid/framing/FieldTable.h"
> @@ -48,7 +49,7 @@
>
>   Message::Message(const framing::SequenceNumber&  id) :
>       frames(id), persistenceId(0), redelivered(false), loaded(false),
> -    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
> +    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
>       expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
>
>   Message::~Message()
> @@ -75,7 +76,7 @@
>       return getAdapter().getRoutingKey(frames);
>   }
>
> -std::string Message::getExchangeName() const
> +std::string Message::getExchangeName() const
>   {
>       return getAdapter().getExchange(frames);
>   }
> @@ -84,7 +85,7 @@
>   {
>       if (!exchange) {
>           exchange = registry.get(getExchangeName());
> -    }
> +    }
>       return exchange;
>   }
>
> @@ -98,7 +99,7 @@
>       return getAdapter().getApplicationHeaders(frames);
>   }
>
> -bool Message::isPersistent()
> +bool Message::isPersistent() const
>   {
>       return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
>   }
> @@ -175,26 +176,25 @@
>       } else {
>           //adjust header flags
>           MarkLastSegment f;
> -        frames.map_if(f, TypeFilter<HEADER_BODY>());
> +        frames.map_if(f, TypeFilter<HEADER_BODY>());
>       }
>       //mark content loaded
>       loaded = true;
>   }
>
> -void Message::releaseContent(MessageStore* _store)
> +void Message::releaseContent(bool immediate)
>   {
> -    if (!store) {
> -        store = _store;
> -    }
> -    if (store) {
> +    if (store&&  !NullMessageStore::isNullStore(store)&&  (immediate || releaseMgr.canRelease())) {
>           if (!getPersistenceId()) {
>               intrusive_ptr<PersistableMessage>  pmsg(this);
>               store->stage(pmsg);
>               staged = true;
> -        }
> -        //remove any content frames from the frameset
> -        frames.remove(TypeFilter<CONTENT_BODY>());
> -        setContentReleased();
> +            frames.remove(TypeFilter<CONTENT_BODY>());
> +            setContentReleased();
> +       } else if (immediate || releaseMgr.canRelease()) {
> +           frames.remove(TypeFilter<CONTENT_BODY>());
> +           setContentReleased();
> +       }
>       }
>   }
>
> @@ -213,7 +213,7 @@
>   {
>       if (isContentReleased()) {
>           intrusive_ptr<const PersistableMessage>  pmsg(this);
> -
> +
>           bool done = false;
>           string&  data = frame.castBody<AMQContentBody>()->getData();
>           store->loadContent(queue, pmsg, data, offset, maxContentSize);
> @@ -239,7 +239,7 @@
>           uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
>           bool morecontent = true;
>           for (uint64_t offset = 0; morecontent; offset += maxContentSize)
> -        {
> +        {
>               AMQFrame frame((AMQContentBody()));
>               morecontent = getContentFrame(queue, frame, maxContentSize, offset);
>               out.handle(frame);
> @@ -257,7 +257,7 @@
>   {
>       sys::Mutex::ScopedLock l(lock);
>       Relay f(out);
> -    frames.map_if(f, TypeFilter<HEADER_BODY>());
> +    frames.map_if(f, TypeFilter<HEADER_BODY>());
>   }
>
>   // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
> @@ -287,7 +287,7 @@
>   }
>
>
> -namespace
> +namespace
>   {
>   const std::string X_QPID_TRACE("x-qpid.trace");
>   }
> @@ -324,13 +324,13 @@
>               trace += ",";
>               trace += id;
>               headers.setString(X_QPID_TRACE, trace);
> -        }
> +        }
>       }
>   }
>
> -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>&  e)
> +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>&  e)
>   {
> -    DeliveryProperties* props = getProperties<DeliveryProperties>();
> +    DeliveryProperties* props = getProperties<DeliveryProperties>();
>       if (props->getTtl()) {
>           // AMQP requires setting the expiration property to be posix
>           // time_t in seconds. TTL is in milliseconds
> @@ -347,7 +347,7 @@
>
>   void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>&  e) {
>       expiryPolicy = e;
> -    if (expiryPolicy)
> +    if (expiryPolicy)
>           expiryPolicy->willExpire(*this);
>   }
>
> @@ -362,7 +362,7 @@
>       Replacement::iterator i = replacement.find(qfor);
>       if (i != replacement.end()){
>           return i->second;
> -    }		
> +    }
>       return empty;
>   }
>
> @@ -410,7 +410,7 @@
>
>   bool Message::isUpdateMessage()
>   {
> -    return updateDestination.size()&&  isA<MessageTransferBody>()
> +    return updateDestination.size()&&  isA<MessageTransferBody>()
>           &&  getMethod<MessageTransferBody>()->getDestination() == updateDestination;
>   }
>
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -34,12 +34,12 @@
>   #include<vector>
>
>   namespace qpid {
> -	
> +
>   namespace framing {
>   class FieldTable;
>   class SequenceNumber;
>   }
> -	
> +
>   namespace broker {
>   class ConnectionToken;
>   class Exchange;
> @@ -51,10 +51,10 @@
>   class Message : public PersistableMessage {
>   public:
>       typedef boost::function<void (const boost::intrusive_ptr<Message>&)>  MessageCallback;
> -
> +
>       QPID_BROKER_EXTERN Message(const framing::SequenceNumber&  id = framing::SequenceNumber());
>       QPID_BROKER_EXTERN ~Message();
> -
> +
>       uint64_t getPersistenceId() const { return persistenceId; }
>       void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
>
> @@ -74,7 +74,7 @@
>       bool isImmediate() const;
>       QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
>       framing::FieldTable&  getOrInsertHeaders();
> -    QPID_BROKER_EXTERN bool isPersistent();
> +    QPID_BROKER_EXTERN bool isPersistent() const;
>       bool requiresAccept();
>
>       QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>&  e);
> @@ -82,8 +82,8 @@
>       bool hasExpired();
>       sys::AbsTime getExpiration() const { return expiration; }
>
> -    framing::FrameSet&  getFrames() { return frames; }
> -    const framing::FrameSet&  getFrames() const { return frames; }
> +    framing::FrameSet&  getFrames() { return frames; }
> +    const framing::FrameSet&  getFrames() const { return frames; }
>
>       template<class T>  T* getProperties() {
>           qpid::framing::AMQHeaderBody* p = frames.getHeaders();
> @@ -128,13 +128,13 @@
>
>       QPID_BROKER_EXTERN void decodeHeader(framing::Buffer&  buffer);
>       QPID_BROKER_EXTERN void decodeContent(framing::Buffer&  buffer);
> -
> +
>       /**
>        * Releases the in-memory content data held by this
>        * message. Must pass in a store from which the data can
>        * be reloaded.
>        */
> -    void releaseContent(MessageStore* store);
> +    void releaseContent(bool immediate = false);
>       void destroy();
>
>       bool getContentFrame(const Queue&  queue, framing::AMQFrame&  frame, uint16_t maxContentSize, uint64_t offset) const;
> @@ -145,10 +145,11 @@
>
>       bool isExcluded(const std::vector<std::string>&  excludes) const;
>       void addTraceId(const std::string&  id);
> -	
> -	void forcePersistent();
> -	bool isForcedPersistent();
> -
> +
> +    void forcePersistent();
> +    bool isForcedPersistent();
> +    void setStore(MessageStore* s) { store = s; }
> +
>       boost::intrusive_ptr<Message>&  getReplacementMessage(const Queue* qfor) const;
>       void setReplacementMessage(boost::intrusive_ptr<Message>  msg, const Queue* qfor);
>
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Fri Sep 11 13:33:42 2009
> @@ -35,7 +35,6 @@
>       std::string type_str(uint8_t type);
>       const std::string QPID_MANAGEMENT("qpid.management");
>   }
> -
>   MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
>       state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
>
> @@ -80,7 +79,7 @@
>               &&  !NullMessageStore::isNullStore(store)
>               &&  message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
>           {
> -            message->releaseContent(store);
> +            message->releaseContent(true);
>               staging = true;
>           }
>       }
> @@ -96,6 +95,7 @@
>   void MessageBuilder::start(const SequenceNumber&  id)
>   {
>       message = intrusive_ptr<Message>(new Message(id));
> +    message->setStore(store);
>       state = METHOD;
>       staging = false;
>   }
>
> Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageReleaseManager.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageReleaseManager.h?rev=813825&view=auto
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/MessageReleaseManager.h (added)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageReleaseManager.h Fri Sep 11 13:33:42 2009
> @@ -0,0 +1,54 @@
> +/*
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +#ifndef _broker_MessageReleaseManager_h
> +#define _broker_MessageReleaseManager_h
> +
> +namespace qpid {
> +    namespace broker {
> +
> +        class MessageReleaseManager
> +        {
> +        private:
> +            bool releaseBlocked;
> +            bool releaseRequested;
> +            bool released;
> +
> +        public:
> +            MessageReleaseManager(): releaseBlocked(false), releaseRequested(false), released(false) {}
> +            virtual ~MessageReleaseManager() {}
> +
> +            bool isReleaseBlocked() const { return releaseBlocked; }
> +            void blockRelease() { if (!released) releaseBlocked = true; }
> +
> +            bool isReleaseRequested() const { return releaseRequested; }
> +            void setReleaseRequested() { if (!released) releaseRequested = true; }
> +
> +            bool isReleased() const { return released; }
> +            void setReleased() { released = true; }
> +
> +            bool canRelease() { return !releaseBlocked&&  releaseRequested; }
> +        };
> +
> +    }
> +}
> +
> +
> +#endif  /*_broker_MessageReleaseManager_h*/
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -22,6 +22,7 @@
>
>   #include "qpid/broker/PersistableMessage.h"
>   #include "qpid/broker/MessageStore.h"
> +#include "qpid/broker/NullMessageStore.h"
>   #include<iostream>
>
>   using namespace qpid::broker;
> @@ -34,9 +35,8 @@
>   PersistableMessage::~PersistableMessage() {}
>
>   PersistableMessage::PersistableMessage() :
> -    asyncEnqueueCounter(0),
> +    asyncEnqueueCounter(0),
>       asyncDequeueCounter(0),
> -    contentReleased(false),
>       store(0)
>   {}
>
> @@ -56,13 +56,22 @@
>           if (q) {
>               store->flush(*q);
>           }
> -    }
> +    }
>   }
>
> -void PersistableMessage::setContentReleased() {contentReleased = true; }
> +void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); }
> +
> +void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); }
> +
> +bool PersistableMessage::requestContentRelease()
> +{
> +    if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false;
> +    releaseMgr.setReleaseRequested();
> +    return true;
> +}
> +
> +bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); }
>
> -bool PersistableMessage::isContentReleased()const { return contentReleased; }
> -	
>   bool PersistableMessage::isEnqueueComplete() {
>       sys::ScopedLock<sys::Mutex>  l(asyncEnqueueLock);
>       return asyncEnqueueCounter == 0;
> @@ -85,8 +94,8 @@
>               for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
>                   PersistableQueue::shared_ptr q(i->lock());
>                   if (q) q->notifyDurableIOComplete();
> -            }
> -        }
> +            }
> +        }
>       }
>   }
>
> @@ -95,13 +104,13 @@
>           for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
>               PersistableQueue::shared_ptr q(i->lock());
>               if (q&&  q->getPersistenceId() == queue->getPersistenceId())  return true;
> -        }
> -    }
> +        }
> +    }
>       return false;
>   }
>
>
> -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
> +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
>       if (_store){
>           sys::ScopedLock<sys::Mutex>  l(storeLock);
>           store = _store;
> @@ -110,22 +119,22 @@
>       }
>   }
>
> -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
> +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
>       addToSyncList(queue, _store);
>       enqueueAsync();
>   }
>
> -void PersistableMessage::enqueueAsync() {
> +void PersistableMessage::enqueueAsync() {
>       sys::ScopedLock<sys::Mutex>  l(asyncEnqueueLock);
> -    asyncEnqueueCounter++;
> +    asyncEnqueueCounter++;
>   }
>
> -bool PersistableMessage::isDequeueComplete() {
> +bool PersistableMessage::isDequeueComplete() {
>       sys::ScopedLock<sys::Mutex>  l(asyncDequeueLock);
>       return asyncDequeueCounter == 0;
>   }
> -
> -void PersistableMessage::dequeueComplete() {
> +
> +void PersistableMessage::dequeueComplete() {
>       bool notify = false;
>       {
>           sys::ScopedLock<sys::Mutex>  l(asyncDequeueLock);
> @@ -138,7 +147,7 @@
>       if (notify) allDequeuesComplete();
>   }
>
> -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
> +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
>       if (_store){
>           sys::ScopedLock<sys::Mutex>  l(storeLock);
>           store = _store;
> @@ -148,9 +157,9 @@
>       dequeueAsync();
>   }
>
> -void PersistableMessage::dequeueAsync() {
> +void PersistableMessage::dequeueAsync() {
>       sys::ScopedLock<sys::Mutex>  l(asyncDequeueLock);
> -    asyncDequeueCounter++;
> +    asyncDequeueCounter++;
>   }
>
>   }}
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -31,6 +31,7 @@
>   #include "qpid/framing/amqp_types.h"
>   #include "qpid/sys/Mutex.h"
>   #include "qpid/broker/PersistableQueue.h"
> +#include "qpid/broker/MessageReleaseManager.h"
>
>   namespace qpid {
>   namespace broker {
> @@ -46,7 +47,7 @@
>       sys::Mutex asyncEnqueueLock;
>       sys::Mutex asyncDequeueLock;
>       sys::Mutex storeLock;
> -	
> +
>       /**
>        * Tracks the number of outstanding asynchronous enqueue
>        * operations. When the message is enqueued asynchronously the
> @@ -68,7 +69,6 @@
>       void enqueueAsync();
>       void dequeueAsync();
>
> -    bool contentReleased;
>       syncList synclist;
>
>     protected:
> @@ -81,6 +81,8 @@
>
>       MessageStore* store;
>
> +    MessageReleaseManager releaseMgr;
> +
>
>     public:
>       typedef boost::shared_ptr<PersistableMessage>  shared_ptr;
> @@ -95,9 +97,15 @@
>       PersistableMessage();
>
>       void flush();
> -
> +
> +    bool requestContentRelease();
> +
>       bool isContentReleased() const;
> -	
> +
> +    void blockRelease();
> +
> +    virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
> +
>       QPID_BROKER_EXTERN bool isEnqueueComplete();
>
>       QPID_BROKER_EXTERN void enqueueComplete();
> @@ -107,16 +115,16 @@
>
>
>       QPID_BROKER_EXTERN bool isDequeueComplete();
> -
> +
>       QPID_BROKER_EXTERN void dequeueComplete();
>
>       QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
>                                            MessageStore* _store);
>
>       bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
> -
> +
>       void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
> -
> +
>   };
>
>   }}
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -55,18 +55,18 @@
>
>       virtual const std::string&  getName() const = 0;
>       virtual ~PersistableQueue() {
> -        if (externalQueueStore)
> +        if (externalQueueStore)
>                delete externalQueueStore;
>       };
>
>       virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
> -
> +
>       inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
> -
> +
>       PersistableQueue():externalQueueStore(NULL){
>       };
> -
> -
> +
> +
>       /**
>       * call back to signal async AIO writes have
>       * completed (enqueue/dequeue etc)
> @@ -76,9 +76,9 @@
>       */
>       virtual void notifyDurableIOComplete()  = 0;
>   protected:
> -
> +
>       ExternalQueueStore* externalQueueStore;
> -
> +
>   };
>
>   }}
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -56,7 +56,7 @@
>   namespace _qmf = qmf::org::apache::qpid::broker;
>
>
> -namespace
> +namespace
>   {
>   const std::string qpidMaxSize("qpid.max_size");
>   const std::string qpidMaxCount("qpid.max_count");
> @@ -76,16 +76,16 @@
>   const int ENQUEUE_AND_DEQUEUE=2;
>   }
>
> -Queue::Queue(const string&  _name, bool _autodelete,
> +Queue::Queue(const string&  _name, bool _autodelete,
>                MessageStore* const _store,
>                const OwnershipToken* const _owner,
>                Manageable* parent,
>                Broker* b) :
>
> -    name(_name),
> +    name(_name),
>       autodelete(_autodelete),
>       store(_store),
> -    owner(_owner),
> +    owner(_owner),
>       consumerCount(0),
>       exclusive(0),
>       noLocal(false),
> @@ -182,9 +182,9 @@
>
>   void Queue::recover(boost::intrusive_ptr<Message>&  msg){
>       push(msg, true);
> -    if (store){
> +    if (store){
>           // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
> -        msg->addToSyncList(shared_from_this(), store);
> +        msg->addToSyncList(shared_from_this(), store);
>       }
>       msg->enqueueComplete(); // mark the message as enqueued
>       mgntEnqStats(msg);
> @@ -192,7 +192,7 @@
>       if (store&&  !msg->isContentLoaded()) {
>           //content has not been loaded, need to ensure that lazy loading mode is set:
>           //TODO: find a nicer way to do this
> -        msg->releaseContent(store);
> +        msg->releaseContent(true);
>       }
>   }
>
> @@ -209,13 +209,13 @@
>       if (!isEnqueued(msg)) return;
>
>       QueueListeners::NotificationSet copy;
> -    {
> +    {
>           Mutex::ScopedLock locker(messageLock);
>           msg.payload->enqueueComplete(); // mark the message as enqueued
>           messages.push_front(msg);
>           listeners.populate(copy);
>
> -        // for persistLastNode - don't force a message twice to disk, but force it if no force before
> +        // for persistLastNode - don't force a message twice to disk, but force it if no force before
>           if(inLastNodeFailure&&  persistLastNode&&  !msg.payload->isStoredOnQueue(shared_from_this())) {
>               msg.payload->forcePersistent();
>               if (msg.payload->isForcedPersistent() ){
> @@ -234,7 +234,7 @@
>       }
>   }
>
> -bool Queue::acquireMessageAt(const SequenceNumber&  position, QueuedMessage&  message)
> +bool Queue::acquireMessageAt(const SequenceNumber&  position, QueuedMessage&  message)
>   {
>       Mutex::ScopedLock locker(messageLock);
>       QPID_LOG(debug, "Attempting to acquire message at "<<  position);
> @@ -258,7 +258,7 @@
>       QPID_LOG(debug, "attempting to acquire "<<  msg.position);
>       for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
>           if ((i->position == msg.position&&  !lastValueQueue) // note that in some cases payload not be set
> -            || (lastValueQueue&&  (i->position == msg.position)&&
> +            || (lastValueQueue&&  (i->position == msg.position)&&
>                   msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>
>               clearLVQIndex(msg);
> @@ -296,7 +296,7 @@
>             case NO_MESSAGES:
>             default:
>               return false;
> -        }
> +        }
>       } else {
>           return browseNextMessage(m, c);
>       }
> @@ -317,7 +317,7 @@
>               //enqueued and so is not available for consumption yet,
>               //register consumer for notification when this changes
>               listeners.addListener(c);
> -            return false;
> +            return false;
>           } else {
>               //check that consumer has sufficient credit for the
>               //message (if it does not, no need to register it for
> @@ -332,7 +332,7 @@
>   {
>       while (true) {
>           Mutex::ScopedLock locker(messageLock);
> -        if (messages.empty()) {
> +        if (messages.empty()) {
>               QPID_LOG(debug, "No messages to dispatch on queue '"<<  name<<  "'");
>               listeners.addListener(c);
>               return NO_MESSAGES;
> @@ -345,7 +345,7 @@
>               }
>
>               if (c->filter(msg.payload)) {
> -                if (c->accept(msg.payload)) {
> +                if (c->accept(msg.payload)) {
>                       m = msg;
>                       popMsg(msg);
>                       return CONSUMED;
> @@ -358,7 +358,7 @@
>                   //consumer will never want this message
>                   QPID_LOG(debug, "Consumer doesn't want message from '"<<  name<<  "'");
>                   return CANT_CONSUME;
> -            }
> +            }
>           }
>       }
>   }
> @@ -423,7 +423,7 @@
>           if (c->position<  getFront().position) {
>               msg = getFront();
>               return true;
> -        } else {
> +        } else {
>               //TODO: can improve performance of this search, for now just searching linearly from end
>               Messages::reverse_iterator pos;
>               for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend()&&  i->position>  c->position; i++) {
> @@ -524,7 +524,7 @@
>    */
>   uint32_t Queue::purge(const uint32_t purge_request){
>       Mutex::ScopedLock locker(messageLock);
> -    uint32_t purge_count = purge_request; // only comes into play if>0
> +    uint32_t purge_count = purge_request; // only comes into play if>0
>
>       uint32_t count = 0;
>       // Either purge them all or just the some (purge_count) while the queue isn't empty.
> @@ -537,7 +537,7 @@
>
>   uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
>       Mutex::ScopedLock locker(messageLock);
> -    uint32_t move_count = qty; // only comes into play if  qty>0
> +    uint32_t move_count = qty; // only comes into play if  qty>0
>       uint32_t count = 0; // count how many were moved for returning
>
>       while((!qty || move_count--)&&  !messages.empty()) {
> @@ -566,15 +566,16 @@
>       Messages dequeues;
>       QueueListeners::NotificationSet copy;
>       {
> -        Mutex::ScopedLock locker(messageLock);
> +        Mutex::ScopedLock locker(messageLock);
>           QueuedMessage qm(this, msg, ++sequence);
> +        msg->setStore(store);
>           if (policy.get()) {
>               policy->tryEnqueue(qm);
>               //depending on policy, may have some dequeues
>               if (!isRecovery) pendingDequeues.swap(dequeues);
>           }
>           if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
> -
> +
>           LVQ::iterator i;
>           const framing::FieldTable* ft = msg->getApplicationHeaders();
>           if (lastValueQueue&&  ft){
> @@ -584,7 +585,7 @@
>               if (i == lvq.end() || msg->isUpdateMessage()){
>                   messages.push_back(qm);
>                   listeners.populate(copy);
> -                lvq[key] = msg;
> +                lvq[key] = msg;
>               }else {
>                   boost::intrusive_ptr<Message>  old = i->second->getReplacementMessage(this);
>                   if (!old) old = i->second;
> @@ -594,10 +595,10 @@
>                       //recovery is complete
>                       pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
>                   } else {
> -                    Mutex::ScopedUnlock u(messageLock);
> +                    Mutex::ScopedUnlock u(messageLock);
>                       dequeue(0, QueuedMessage(qm.queue, old, qm.position));
>                   }
> -            }		
> +            }
>           }else {
>               messages.push_back(qm);
>               listeners.populate(copy);
> @@ -632,8 +633,8 @@
>           if (ft) {
>               string key = ft->getAsString(qpidVQMatchProperty);
>               if (lvq.find(key) != lvq.end()){
> -                lvq[key] = replacement;
> -            }
> +                lvq[key] = replacement;
> +            }
>           }
>           msg.payload = replacement;
>       }
> @@ -644,7 +645,7 @@
>   uint32_t Queue::getMessageCount() const
>   {
>       Mutex::ScopedLock locker(messageLock);
> -
> +
>       uint32_t count = 0;
>       for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
>           //NOTE: don't need to use checkLvqReplace() here as it
> @@ -652,7 +653,7 @@
>           //so the enqueueComplete check has no effect
>           if ( i->payload->isEnqueueComplete() ) count ++;
>       }
> -
> +
>       return count;
>   }
>
> @@ -696,13 +697,13 @@
>       }
>   }
>
> -// return true if store exists,
> +// return true if store exists,
>   bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>  msg)
>   {
>       if (inLastNodeFailure&&  persistLastNode){
>           msg->forcePersistent();
>       }
> -	
> +
>       if (traceId.size()) {
>           msg->addTraceId(traceId);
>       }
> @@ -716,13 +717,13 @@
>       return false;
>   }
>
> -// return true if store exists,
> +// return true if store exists,
>   bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage&  msg)
>   {
>       {
>           Mutex::ScopedLock locker(messageLock);
>           if (!isEnqueued(msg)) return false;
> -        if (!ctxt) {
> +        if (!ctxt) {
>               dequeued(msg);
>           }
>       }
> @@ -738,7 +739,7 @@
>   void Queue::dequeueCommitted(const QueuedMessage&  msg)
>   {
>       Mutex::ScopedLock locker(messageLock);
> -    dequeued(msg);
> +    dequeued(msg);
>       if (mgmtObject != 0) {
>           mgmtObject->inc_msgTxnDequeues();
>           mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
> @@ -794,7 +795,7 @@
>           QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
>           lastValueQueue = lastValueQueueNoBrowse;
>       }
> -
> +
>       persistLastNode= _settings.get(qpidPersistLastNode);
>       if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
>
> @@ -803,7 +804,7 @@
>       if (excludeList.size()) {
>           split(traceExclude, excludeList, ", ");
>       }
> -    QPID_LOG(debug, "Configured queue "<<  getName()<<  " with qpid.trace.id='"<<  traceId
> +    QPID_LOG(debug, "Configured queue "<<  getName()<<  " with qpid.trace.id='"<<  traceId
>                <<  "' and qpid.trace.exclude='"<<  excludeList<<  "' i.e. "<<  traceExclude.size()<<  " elements");
>
>       eventMode = _settings.getAsInt(qpidQueueEventGeneration);
> @@ -859,9 +860,9 @@
>       return policy.get();
>   }
>
> -uint64_t Queue::getPersistenceId() const
> -{
> -    return persistenceId;
> +uint64_t Queue::getPersistenceId() const
> +{
> +    return persistenceId;
>   }
>
>   void Queue::setPersistenceId(uint64_t _persistenceId) const
> @@ -880,18 +881,18 @@
>       persistenceId = _persistenceId;
>   }
>
> -void Queue::encode(Buffer&  buffer) const
> +void Queue::encode(Buffer&  buffer) const
>   {
>       buffer.putShortString(name);
>       buffer.put(settings);
> -    if (policy.get()) {
> +    if (policy.get()) {
>           buffer.put(*policy);
>       }
>   }
>
>   uint32_t Queue::encodedSize() const
>   {
> -    return name.size() + 1/*short string size octet*/ + settings.encodedSize()
> +    return name.size() + 1/*short string size octet*/ + settings.encodedSize()
>           + (policy.get() ? (*policy).encodedSize() : 0);
>   }
>
> @@ -922,50 +923,50 @@
>
>   void Queue::tryAutoDelete(Broker&  broker, Queue::shared_ptr queue)
>   {
> -    if (broker.getQueues().destroyIf(queue->getName(),
> +    if (broker.getQueues().destroyIf(queue->getName(),
>                                        boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
>           queue->unbind(broker.getExchanges(), queue);
>           queue->destroy();
>       }
>   }
>
> -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
> -{
> +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
> +{
>       Mutex::ScopedLock locker(ownershipLock);
> -    return o == owner;
> +    return o == owner;
>   }
>
> -void Queue::releaseExclusiveOwnership()
> -{
> +void Queue::releaseExclusiveOwnership()
> +{
>       Mutex::ScopedLock locker(ownershipLock);
> -    owner = 0;
> +    owner = 0;
>   }
>
> -bool Queue::setExclusiveOwner(const OwnershipToken* const o)
> -{
> +bool Queue::setExclusiveOwner(const OwnershipToken* const o)
> +{
>       Mutex::ScopedLock locker(ownershipLock);
>       if (owner) {
>           return false;
>       } else {
> -        owner = o;
> +        owner = o;
>           return true;
>       }
>   }
>
> -bool Queue::hasExclusiveOwner() const
> -{
> +bool Queue::hasExclusiveOwner() const
> +{
>       Mutex::ScopedLock locker(ownershipLock);
> -    return owner != 0;
> +    return owner != 0;
>   }
>
> -bool Queue::hasExclusiveConsumer() const
> -{
> -    return exclusive;
> +bool Queue::hasExclusiveConsumer() const
> +{
> +    return exclusive;
>   }
>
>   void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
> -    if (externalQueueStore!=inst&&  externalQueueStore)
> -        delete externalQueueStore;
> +    if (externalQueueStore!=inst&&  externalQueueStore)
> +        delete externalQueueStore;
>       externalQueueStore = inst;
>
>       if (inst) {
> @@ -975,19 +976,6 @@
>       }
>   }
>
> -bool Queue::releaseMessageContent(const QueuedMessage&  m)
> -{
> -    if (store&&  !NullMessageStore::isNullStore(store)) {
> -        QPID_LOG(debug, "Message "<<  m.position<<  " on "<<  name<<  " released from memory");
> -        m.payload->releaseContent(store);
> -        return true;
> -    } else {
> -        QPID_LOG(warning, "Message "<<  m.position<<  " on "<<  name
> -<<  " cannot be released from memory as the queue is not durable");
> -        return false;
> -    }
> -}
> -
>   ManagementObject* Queue::GetManagementObject (void) const
>   {
>       return (ManagementObject*) mgmtObject;
> @@ -1062,7 +1050,7 @@
>   void Queue::addPendingDequeue(const QueuedMessage&  msg)
>   {
>       //assumes lock is held - true at present but rather nasty as this is a public method
> -    pendingDequeues.push_back(msg);
> +    pendingDequeues.push_back(msg);
>   }
>
>   QueueListeners&  Queue::getListeners() { return listeners; }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -156,8 +156,8 @@
>               typedef std::vector<shared_ptr>  vector;
>
>               QPID_BROKER_EXTERN Queue(const string&  name,
> -                                     bool autodelete = false,
> -                                     MessageStore* const store = 0,
> +                                     bool autodelete = false,
> +                                     MessageStore* const store = 0,
>                                        const OwnershipToken* const owner = 0,
>                                        management::Manageable* parent = 0,
>                                        Broker* broker = 0);
> @@ -213,11 +213,11 @@
>                                               bool exclusive = false);
>               QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
>
> -            uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
> +            uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
>               QPID_BROKER_EXTERN void purgeExpired();
>
>               //move qty # of messages to destination Queue destq
> -            uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
> +            uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>
>               QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>               QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
> @@ -254,8 +254,8 @@
>                * Inform queue of messages that were enqueued, have since
>                * been acquired but not yet accepted or released (and
>                * thus are still logically on the queue) - used in
> -             * clustered broker.
> -             */
> +             * clustered broker.
> +             */
>               void enqueued(const QueuedMessage&  msg);
>
>               /**
> @@ -266,9 +266,9 @@
>                * accepted it).
>                */
>               bool isEnqueued(const QueuedMessage&  msg);
> -
> +
>               /**
> -             * Gets the next available message
> +             * Gets the next available message
>                */
>               QPID_BROKER_EXTERN QueuedMessage get();
>
> @@ -315,8 +315,6 @@
>                   bindings.eachBinding(f);
>               }
>
> -            bool releaseMessageContent(const QueuedMessage&);
> -
>               void popMsg(QueuedMessage&  qmsg);
>
>               /** Set the position sequence number  for the next message on the queue.
> @@ -340,7 +338,7 @@
>                * queues. It is used for dequeueing messages in response
>                * to an enqueue while avoid holding lock over call to
>                * store.
> -             *
> +             *
>                * Assumes messageLock is held - true for curent use case
>                * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
>                * method
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -28,7 +28,7 @@
>   using namespace qpid::broker;
>   using namespace qpid::framing;
>
> -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string&  _type) :
> +QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string&  _type) :
>       maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
>
>   void QueuePolicy::enqueued(uint64_t _size)
> @@ -89,7 +89,7 @@
>       } else {
>           std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
>           throw ResourceLimitExceededException(
> -            QPID_MSG("Policy exceeded on "<<  queue<<  " by message "<<  m.position
> +            QPID_MSG("Policy exceeded on "<<  queue<<  " by message "<<  m.position
>                        <<  " of size "<<  m.payload->contentSize()<<  " , policy: "<<  *this));
>       }
>   }
> @@ -129,7 +129,7 @@
>       FieldTable::ValuePtr v = settings.get(typeKey);
>       if (v&&  v->convertsTo<std::string>()) {
>           std::string t = v->get<std::string>();
> -        std::transform(t.begin(), t.end(), t.begin(), tolower);
> +        std::transform(t.begin(), t.end(), t.begin(), tolower);
>           if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
>       }
>       return FLOW_TO_DISK;
> @@ -152,7 +152,7 @@
>     buffer.putLongLong(size.get());
>   }
>
> -void QueuePolicy::decode ( Buffer&  buffer )
> +void QueuePolicy::decode ( Buffer&  buffer )
>   {
>     maxCount = buffer.getLong();
>     maxSize  = buffer.getLongLong();
> @@ -179,15 +179,15 @@
>   const std::string QueuePolicy::RING_STRICT("ring_strict");
>   uint64_t QueuePolicy::defaultMaxSize(0);
>
> -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
> +FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
>       QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
>
>   bool FlowToDiskPolicy::checkLimit(const QueuedMessage&  m)
>   {
> -    return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
> +    return QueuePolicy::checkLimit(m) || (m.queue->getPersistenceId()&&  m.payload->requestContentRelease());
>   }
>
> -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string&  _type) :
> +RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string&  _type) :
>       QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
>
>   bool before(const QueuedMessage&  a, const QueuedMessage&  b)
> @@ -219,19 +219,19 @@
>       //for non-strict ring policy, a message can be replaced (and
>       //therefore dequeued) before it is accepted or released by
>       //subscriber; need to detect this
> -    return find(m, pendingDequeues, false) || find(m, queue, false);
> +    return find(m, pendingDequeues, false) || find(m, queue, false);
>   }
>
>   bool RingQueuePolicy::checkLimit(const QueuedMessage&  m)
>   {
>       if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
> -
> +
>       QueuedMessage oldest;
>       {
>           qpid::sys::Mutex::ScopedLock l(lock);
>           if (queue.empty()) {
> -            QPID_LOG(debug, "Message too large for ring queue "
> -<<  (m.queue ? m.queue->getName() : std::string("unknown queue"))
> +            QPID_LOG(debug, "Message too large for ring queue "
> +<<  (m.queue ? m.queue->getName() : std::string("unknown queue"))
>                        <<  " ["<<  *this<<  "]"
>                        <<  ": message size = "<<  m.payload->contentSize()<<  " bytes");
>               return false;
> @@ -251,13 +251,13 @@
>               pendingDequeues.push_back(oldest);
>           }
>           oldest.queue->addPendingDequeue(oldest);
> -        QPID_LOG(debug, "Ring policy triggered in queue "
> +        QPID_LOG(debug, "Ring policy triggered in queue "
>                    <<  (m.queue ? m.queue->getName() : std::string("unknown queue"))
>                    <<  ": removed message "<<  oldest.position<<  " to make way for "<<  m.position);
>           return true;
>       } else {
> -        QPID_LOG(debug, "Ring policy could not be triggered in queue "
> -<<  (m.queue ? m.queue->getName() : std::string("unknown queue"))
> +        QPID_LOG(debug, "Ring policy could not be triggered in queue "
> +<<  (m.queue ? m.queue->getName() : std::string("unknown queue"))
>                    <<  ": oldest message (seq-no="<<  oldest.position<<  ") has been delivered but not yet acknowledged or requeued");
>           //in strict mode, if oldest message has been delivered (hence
>           //cannot be acquired) but not yet acked, it should not be
> @@ -299,7 +299,7 @@
>       }
>
>   }
> -
> +
>   namespace qpid {
>       namespace broker {
>
> @@ -309,7 +309,7 @@
>       else out<<  "size: unlimited";
>       out<<  "; ";
>       if (p.maxCount) out<<  "count: max="<<  p.maxCount<<  ", current="<<  p.count.get();
> -    else out<<  "count: unlimited";
> +    else out<<  "count: unlimited";
>       out<<  "; type="<<  p.type;
>       return out;
>   }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -35,7 +35,7 @@
>   namespace broker {
>
>   RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry&  _queues, ExchangeRegistry&  _exchanges, LinkRegistry&  _links,
> -                                         DtxManager&  _dtxMgr, uint64_t _stagingThreshold)
> +                                         DtxManager&  _dtxMgr, uint64_t _stagingThreshold)
>       : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
>
>   RecoveryManagerImpl::~RecoveryManagerImpl() {}
> @@ -45,7 +45,7 @@
>       intrusive_ptr<Message>  msg;
>       const uint64_t stagingThreshold;
>   public:
> -    RecoverableMessageImpl(const intrusive_ptr<Message>&  _msg, uint64_t _stagingThreshold);
> +    RecoverableMessageImpl(const intrusive_ptr<Message>&  _msg, uint64_t _stagingThreshold);
>       ~RecoverableMessageImpl() {};
>       void setPersistenceId(uint64_t id);
>       bool loadContent(uint64_t available);
> @@ -61,7 +61,7 @@
>   public:
>       RecoverableQueueImpl(const boost::shared_ptr<Queue>&  _queue) : queue(_queue) {}
>       ~RecoverableQueueImpl() {};
> -    void setPersistenceId(uint64_t id);
> +    void setPersistenceId(uint64_t id);
>   	uint64_t getPersistenceId() const;
>       const std::string&  getName() const;
>       void setExternalQueueStore(ExternalQueueStore* inst);
> @@ -129,10 +129,10 @@
>   {
>       boost::intrusive_ptr<Message>  message(new Message());
>       message->decodeHeader(buffer);
> -    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
> +    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
>   }
>
> -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string&  xid,
> +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string&  xid,
>                                                                              std::auto_ptr<TPCTransactionContext>  txn)
>   {
>       DtxBuffer::shared_ptr buffer(new DtxBuffer());
> @@ -159,7 +159,7 @@
>       queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
>   }
>
> -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>&  _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
> +RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>&  _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
>   {
>       if (!msg->isPersistent()) {
>           msg->forcePersistent(); // set so that message will get dequeued from store.
> @@ -195,7 +195,7 @@
>   {
>       queue->setPersistenceId(id);
>   }
> -	
> +
>   uint64_t RecoverableQueueImpl::getPersistenceId() const
>   {
>   	return queue->getPersistenceId();
> @@ -205,7 +205,7 @@
>   {
>       return queue->getName();
>   }
> -
> +
>   void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
>   {
>       queue->setExternalQueueStore(inst);
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Sep 11 13:33:42 2009
> @@ -86,7 +86,7 @@
>       return consumers.find(consumerTag) != consumers.end();
>   }
>
> -void SemanticState::consume(const string&  tag,
> +void SemanticState::consume(const string&  tag,
>                               Queue::shared_ptr queue, bool ackRequired, bool acquire,
>                               bool exclusive, const string&  resumeId, uint64_t resumeTtl, const FieldTable&  arguments)
>   {
> @@ -103,7 +103,7 @@
>           //should cancel all unacked messages for this consumer so that
>           //they are not redelivered on recovery
>           for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
> -
> +
>       }
>   }
>
> @@ -173,7 +173,7 @@
>           dtxBuffer->fail();
>       } else {
>           dtxBuffer->markEnded();
> -    }
> +    }
>       dtxBuffer.reset();
>   }
>
> @@ -233,9 +233,9 @@
>
>   const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
>
> -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
> -                                          const string&  _name,
> -                                          Queue::shared_ptr _queue,
> +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
> +                                          const string&  _name,
> +                                          Queue::shared_ptr _queue,
>                                             bool ack,
>                                             bool _acquire,
>                                             bool _exclusive,
> @@ -244,20 +244,20 @@
>                                             const framing::FieldTable&  _arguments
>
>
> -) :
> +) :
>       Consumer(_acquire),
> -    parent(_parent),
> -    name(_name),
> -    queue(_queue),
> -    ackExpected(ack),
> +    parent(_parent),
> +    name(_name),
> +    queue(_queue),
> +    ackExpected(ack),
>       acquire(_acquire),
> -    blocked(true),
> +    blocked(true),
>       windowing(true),
>       exclusive(_exclusive),
>       resumeId(_resumeId),
>       resumeTtl(_resumeTtl),
>       arguments(_arguments),
> -    msgCredit(0),
> +    msgCredit(0),
>       byteCredit(0),
>       notifyEnabled(true),
>       syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
> @@ -279,7 +279,7 @@
>       if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
>       if (windowing || ackExpected || !acquire) {
>           parent->record(record);
> -    }
> +    }
>       if (acquire&&  !ackExpected) {
>           queue->dequeue(0, msg);
>       }
> @@ -297,7 +297,7 @@
>       // checkCredit fails because the message is to big, we should
>       // remain on queue's listener list for possible smaller messages
>       // in future.
> -    //
> +    //
>       blocked = !(filter(msg)&&  checkCredit(msg));
>       return !blocked;
>   }
> @@ -305,7 +305,7 @@
>   void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>&  msg)
>   {
>       uint32_t originalMsgCredit = msgCredit;
> -    uint32_t originalByteCredit = byteCredit;
> +    uint32_t originalByteCredit = byteCredit;
>       if (msgCredit != 0xFFFFFFFF) {
>           msgCredit--;
>       }
> @@ -315,13 +315,13 @@
>       QPID_LOG(debug, "Credit allocated for '"<<  name<<  "' on "<<  parent
>                <<  ", was "<<  " bytes: "<<  originalByteCredit<<  " msgs: "<<  originalMsgCredit
>                <<  " now bytes: "<<  byteCredit<<  " msgs: "<<  msgCredit);
> -
> +
>   }
>
>   bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>&  msg)
>   {
>       if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF&&  byteCredit<  msg->getRequiredCredit())) {
> -        QPID_LOG(debug, "Not enough credit for '"<<  name<<  "' on "<<  parent
> +        QPID_LOG(debug, "Not enough credit for '"<<  name<<  "' on "<<  parent
>                    <<  ", bytes: "<<  byteCredit<<  " msgs: "<<  msgCredit);
>           return false;
>       } else {
> @@ -341,7 +341,7 @@
>       Queue::shared_ptr queue = c->getQueue();
>       if(queue) {
>           queue->cancel(c);
> -        if (queue->canAutoDelete()&&  !queue->hasExclusiveOwner()) {
> +        if (queue->canAutoDelete()&&  !queue->hasExclusiveOwner()) {
>               Queue::tryAutoDelete(session.getBroker(), queue);
>           }
>       }
> @@ -366,7 +366,7 @@
>
>   void SemanticState::route(intrusive_ptr<Message>  msg, Deliverable&  strategy) {
>       msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
> -
> +
>       std::string exchangeName = msg->getExchangeName();
>       if (!cacheExchange || cacheExchange->getName() != exchangeName)
>           cacheExchange = session.getBroker().getExchanges().get(exchangeName);
> @@ -393,7 +393,7 @@
>
>       if (!strategy.delivered) {
>           //TODO:if discard-unroutable, just drop it
> -        //TODO:else if accept-mode is explicit, reject it
> +        //TODO:else if accept-mode is explicit, reject it
>           //else route it to alternate exchange
>           if (cacheExchange->getAlternate()) {
>               cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
> @@ -402,7 +402,7 @@
>               msg->destroy();
>           }
>       }
> -
> +    msg->releaseContent(); // release frames if release has been requested
>   }
>
>   void SemanticState::requestDispatch()
> @@ -421,7 +421,7 @@
>   }
>
>   bool SemanticState::complete(DeliveryRecord&  delivery)
> -{
> +{
>       ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
>       if (i != consumers.end()) {
>           i->second->complete(delivery);
> @@ -449,7 +449,7 @@
>           unacked.clear();
>           for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
>       }else{
> -        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
> +        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
>           //unconfirmed messages re redelivered and therefore have their
>           //id adjusted, confirmed messages are not and so the ordering
>           //w.r.t id is lost
> @@ -570,7 +570,7 @@
>   }
>
>   AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
> -{
> +{
>       return DeliveryRecord::findRange(unacked, first, last);
>   }
>
> @@ -655,13 +655,13 @@
>           //in transactional mode, don't dequeue or remove, just
>           //maintain set of acknowledged messages:
>           accumulatedAck.add(commands);
> -
> +
>           if (dtxBuffer.get()) {
>               //if enlisted in a dtx, copy the relevant slice from
>               //unacked and record it against that transaction
>               TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
>               accumulatedAck.clear();
> -            dtxBuffer->enlist(txAck);
> +            dtxBuffer->enlist(txAck);
>
>               //mark the relevant messages as 'ended' in unacked
>               //if the messages are already completed, they can be
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -36,7 +36,7 @@
>   // - excessive string copying: should be 0 copy, match from original buffer.
>   // - match/lookup: use descision tree or other more efficient structure.
>
> -namespace
> +namespace
>   {
>   const std::string qpidFedOp("qpid.fed.op");
>   const std::string qpidFedTags("qpid.fed.tags");
> @@ -53,7 +53,7 @@
>   // Iterate over a string of '.'-separated tokens.
>   struct TokenIterator {
>       typedef pair<const char*,const char*>  Token;
> -
> +
>       TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {}
>
>       bool finished() const { return !token.first; }
> @@ -122,7 +122,7 @@
>       Matcher(const string&  p, const string&  k)
>           : matched(), pattern(&p[0],&p[0]+p.size()), key(&k[0],&k[0]+k.size())
>       { matched = match(); }
> -
> +
>       operator bool() const { return matched; }
>
>     private:
> @@ -158,7 +158,7 @@
>           }
>           if (!pattern.finished()&&  pattern.match1('#'))
>               pattern.next();     // Trailing # matches empty.
> -        return pattern.finished()&&  key.finished();
> +        return pattern.finished()&&  key.finished();
>       }
>
>       bool matched;
> @@ -173,7 +173,7 @@
>       return normal;
>   }
>
> -bool TopicExchange::match(const string&  pattern, const string&  key)
> +bool TopicExchange::match(const string&  pattern, const string&  key)
>   {
>       return Matcher(pattern, key);
>   }
> @@ -231,11 +231,11 @@
>            */
>           std::vector<std::string>  keys2prop;
>           {
> -            RWlock::ScopedRlock l(lock);
> +            RWlock::ScopedRlock l(lock);
>               for (BindingMap::iterator iter = bindings.begin();
>                    iter != bindings.end(); iter++) {
>                   const BoundKey&  bk = iter->second;
> -
> +
>                   if (bk.fedBinding.hasLocal()) {
>                       keys2prop.push_back(iter->first);
>                   }
> @@ -293,44 +293,24 @@
>       return q != qv.end();
>   }
>
> -void TopicExchange::route(Deliverable&  msg, const string&  routingKey, const FieldTable* /*args*/){
> -    Binding::vector mb;
> +void TopicExchange::route(Deliverable&  msg, const string&  routingKey, const FieldTable* /*args*/)
> +{
> +    qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding>  >);
>       PreRoute pr(msg, this);
> -    uint32_t count(0);
>
>       {
> -    RWlock::ScopedRlock l(lock);
> -    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
> -        if (match(i->first, routingKey)) {
> -            Binding::vector&  qv(i->second.bindingVector);
> -            for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
> -                mb.push_back(*j);
> +        RWlock::ScopedRlock l(lock);
> +        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
> +            if (match(i->first, routingKey)) {
> +                Binding::vector&  qv(i->second.bindingVector);
> +                for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
> +                    b->push_back(*j);
> +                }
>               }
>           }
>       }
> -    }
> -
> -    for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
> -        msg.deliverTo((*j)->queue);
> -        if ((*j)->mgmtBinding != 0)
> -            (*j)->mgmtBinding->inc_msgMatched ();
> -    }
>
> -    if (mgmtExchange != 0)
> -    {
> -        mgmtExchange->inc_msgReceives  ();
> -        mgmtExchange->inc_byteReceives (msg.contentSize ());
> -        if (count == 0)
> -        {
> -            mgmtExchange->inc_msgDrops  ();
> -            mgmtExchange->inc_byteDrops (msg.contentSize ());
> -        }
> -        else
> -        {
> -            mgmtExchange->inc_msgRoutes  (count);
> -            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
> -        }
> -    }
> +    doRoute(msg, b);
>   }
>
>   bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
> @@ -343,7 +323,7 @@
>           return bindings.size()>  0;
>       } else if (routingKey) {
>           for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
> -            if (match(i->first, *routingKey))
> +            if (match(i->first, *routingKey))
>                   return true;
>               }
>       } else {
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Fri Sep 11 13:33:42 2009
> @@ -10,9 +10,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -39,6 +39,7 @@
>   {
>   public:
>       typedef boost::shared_ptr<const std::vector<T>  >  ConstPtr;
> +    typedef boost::shared_ptr<std::vector<T>  >  Ptr;
>
>       CopyOnWriteArray() {}
>       CopyOnWriteArray(const CopyOnWriteArray&  c) : array(c.array) {}
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -69,7 +69,7 @@
>         // #### TODO: The Binding should take the query text
>         // #### only. Consider encapsulating the entire block, including
>         // #### the if condition.
> -
> +
>
>   bool XmlExchange::bind(Queue::shared_ptr queue, const string&  routingKey, const FieldTable* bindingArguments)
>   {
> @@ -97,7 +97,7 @@
>                           if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
>                               binding->parse_message_content = true;
>                               break;
> -                        }
> +                        }
>                       }
>                   }
>
> @@ -129,11 +129,11 @@
>           }
>           return true;
>       } else {
> -        return false;
> +        return false;
>       }
>   }
>
> -bool XmlExchange::matches(Query&  query, Deliverable&  msg, const qpid::framing::FieldTable* args, bool parse_message_content)
> +bool XmlExchange::matches(Query&  query, Deliverable&  msg, const qpid::framing::FieldTable* args, bool parse_message_content)
>   {
>     string msgContent;
>
> @@ -151,12 +151,12 @@
>
>             QPID_LOG(trace, "matches: message content is ["<<  msgContent<<  "]");
>
> -          XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
> +          XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
>                                                         msgContent.length(), "input" );
>
>   	// This will parse the document using either Xerces or FastXDM, depending
>   	// on your XQilla configuration. FastXDM can be as much as 10x faster.
> -	
> +
>             Sequence seq(context->parseDocument(xml));
>
>             if(!seq.isEmpty()&&  seq.first()->isNode()) {
> @@ -206,49 +206,26 @@
>       PreRoute pr(msg, this);
>       try {
>           XmlBinding::vector::ConstPtr p;
> -	{
> +        qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding>  >);
> +        {
>               RWlock::ScopedRlock l(lock);
> -	    p = bindingsMap[routingKey].snapshot();
> -	    if (!p) return;
> -	}
> -        int count(0);
> +            p = bindingsMap[routingKey].snapshot();
> +            if (!p.get()) return;
> +        }
>
>           for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
> -            if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
> -                msg.deliverTo((*i)->queue);
> -                count++;
> -                QPID_LOG(trace, "Delivered to queue" );
> -
> -                if ((*i)->mgmtBinding != 0)
> -                    (*i)->mgmtBinding->inc_msgMatched ();
> +            if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
> +                b->push_back(*i);
>               }
> -	}
> -	if (!count) {
> -	    QPID_LOG(warning, "XMLExchange "<<  getName()<<  ": could not route message with query "<<  routingKey);
> -	    if (mgmtExchange != 0) {
> -	        mgmtExchange->inc_msgDrops  ();
> -		mgmtExchange->inc_byteDrops (msg.contentSize ());
> -	    }
> -	} else {
> -	    if (mgmtExchange != 0) {
> -	        mgmtExchange->inc_msgRoutes  (count);
> -		mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
> -	    }
> -	}
> -
> -	if (mgmtExchange != 0) {
> -	    mgmtExchange->inc_msgReceives  ();
> -	    mgmtExchange->inc_byteReceives (msg.contentSize ());
> -	}
> +        }
> +        doRoute(msg, b);
>       } catch (...) {
>           QPID_LOG(warning, "XMLExchange "<<  getName()<<  ": exception routing message with query "<<  routingKey);
>       }
> -
> -
>   }
>
>
> -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
> +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
>   {
>       RWlock::ScopedRlock l(lock);
>       if (routingKey) {
> @@ -274,12 +251,12 @@
>   }
>
>
> -XmlExchange::~XmlExchange()
> +XmlExchange::~XmlExchange()
>   {
>       bindingsMap.clear();
>   }
>
>   const std::string XmlExchange::typeName("xml");
> -
> +
>   }
>   }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=813825&r1=813824&r2=813825&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Fri Sep 11 13:33:42 2009
> @@ -7,9 +7,9 @@
>    * to you under the Apache License, Version 2.0 (the
>    * "License"); you may not use this file except in compliance
>    * with the License.  You may obtain a copy of the License at
> - *
> + *
>    *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>    * Unless required by applicable law or agreed to in writing,
>    * software distributed under the License is distributed on an
>    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -53,7 +53,7 @@
>               Binding(key, queue, parent), xquery(query), parse_message_content(true) {}
>       };
>
> -
> +
>       typedef std::map<string, XmlBinding::vector>  XmlBindingsMap;
>
>       XmlBindingsMap bindingsMap;
> @@ -64,13 +64,13 @@
>
>     public:
>       static const std::string typeName;
> -
> +
>       XmlExchange(const std::string&  name, management::Manageable* parent = 0, Broker* broker = 0);
>       XmlExchange(const string&  _name, bool _durable,
>   		const qpid::framing::FieldTable&  _args, management::Manageable* parent = 0, Broker* broker = 0);
>
>       virtual std::string getType() const { return typeName; }
> -
> +
>       virtual bool bind(Queue::shared_ptr queue, const std::string&  routingKey, const qpid::framing::FieldTable* args);
>
>       virtual bool unbind(Queue::shared_ptr queue, const std::string&  routingKey, const qpid::framing::FieldTable* args);
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Gordon Sim <gs...@redhat.com>.
On 09/21/2009 08:24 PM, Kim van der Riet wrote:
> On Mon, 2009-09-21 at 16:39 +0100, Gordon Sim wrote:
>> On 09/17/2009 06:21 PM, Gordon Sim wrote:
>>> I don't mind how many Jiras we use to track the issues. I have created
>>> two already, but don't mind if those get augmented or replaced,
>>> providing we make the problems being solved clear.
>>
>> Carl, Kim,
>>
>> I have attached a candidate fix to QPID-2102 that also address
>> QPID-2101. Have a look and let me know what you think. If we are all
>> happy with this, I can commit it. Else we can keep seeking something
>> agreeable to all.
>>
>> --Gordon
>>
> The code looks good.
>
> One question, however:
>
> I do not see isPersistent() in PersisbableMessage - this will be needed
> for setting the transient flag correctly when enqueuing on the store.
> Did you handle it another way (by casting, for example)?

I did not make that change. It sounds like a separate issue?

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Kim van der Riet <ki...@redhat.com>.
On Mon, 2009-09-21 at 16:39 +0100, Gordon Sim wrote:
> On 09/17/2009 06:21 PM, Gordon Sim wrote:
> > I don't mind how many Jiras we use to track the issues. I have created
> > two already, but don't mind if those get augmented or replaced,
> > providing we make the problems being solved clear.
> 
> Carl, Kim,
> 
> I have attached a candidate fix to QPID-2102 that also address 
> QPID-2101. Have a look and let me know what you think. If we are all 
> happy with this, I can commit it. Else we can keep seeking something 
> agreeable to all.
> 
> --Gordon
> 
The code looks good.

One question, however:

I do not see isPersistent() in PersisbableMessage - this will be needed
for setting the transient flag correctly when enqueuing on the store.
Did you handle it another way (by casting, for example)?

I'll make a separate checkin of the exchange route() refactorisation
from the previous (subsequently rolled out) checkin 813825, which is a
helpful change, but it is no longer needed for this issue.

Kim


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Gordon Sim <gs...@redhat.com>.
On 09/17/2009 06:21 PM, Gordon Sim wrote:
> I don't mind how many Jiras we use to track the issues. I have created
> two already, but don't mind if those get augmented or replaced,
> providing we make the problems being solved clear.

Carl, Kim,

I have attached a candidate fix to QPID-2102 that also address 
QPID-2101. Have a look and let me know what you think. If we are all 
happy with this, I can commit it. Else we can keep seeking something 
agreeable to all.

--Gordon

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Gordon Sim <gs...@redhat.com>.
On 09/15/2009 02:13 PM, Carl Trieloff wrote:
> Gordon Sim wrote:
>> On 09/11/2009 02:33 PM, kpvdr@apache.org wrote:
>>> Author: kpvdr
>>> Date: Fri Sep 11 13:33:42 2009
>>> New Revision: 813825
>>>
>>> URL: http://svn.apache.org/viewvc?rev=813825&view=rev
>>> Log:
>>> Joint checkin with cctrieloff. Refactor of exchange routing so that
>>> multi-queue policy differences may be resolved and allow for correct
>>> multi-queue flow-to-disk behavior. Different queues may have
>>> differing policies and persistence properties - these were previously
>>> being neglected. New c++ test added.
>>
>> I'm not very keen on some of these changes. While the original code is
>> already unpleasant, this allows the bad smells to leak further into
>> the codebase.
>>
>> On a separate point, though its great to get whitespace fixed up, when
>> a commit like this one is contains >60% whitespace changes mixed up
>> with various real semantic changes it becomes much harder to review.
>>
>> However, back to the more important issue, the actual changes and the
>> specific things I dislike:
>>
>> * setting the store on a message in Queue::push()
>>
>> This means that if a message previously pushed on to a durable queue
>> then gets pushed on to a transient queue, the store pointer will be
>> set to null.
>>
>> Though that case could be fixed by a simple check this seems like the
>> wrong place to be associating the message with a store. In fact the
>> same line has also been added to the MessageBuilder (which I think is
>> a reasonable change), so its not clear why its called again here.
>>
>
> Agree -- passing it on release was worse. We should reason through if we
> could pass it only in messageBuilder. Thoughts on that?

Setting it in message builder is sufficient.

>> (And though checking whether all queues are persistent before routing
>> may mean that in the common cases overwriting the store causes no
>> harm, what about the case where a queue is deleted and its messages
>> are routed through an alternate exchange?).
>
> Don't know. Are you able to suggest a test for this case, so that I can
> understand what would be failing

Create a durable queue with an alternate exchange specified and a second 
durable queue with a flow to disk policy specified. Bind both to a 
fanout exchange and send enough messages to that exchange to trigger 
flow to disk for the second queue, releasing message content. Bind a 
third queue - transient - to the alternate exchange for the first queue. 
Delete the first queue (this will route the messages through the 
alternate exchange and enqueue them in the third queue). Consume the 
messages from the second queue.

However, removing the setStore() call from the Queue::push() would 
resolve the issue.

>> * iterating through all matching bindings to check that the queues
>> have a persistence id and taking that to mean they are persistent and
>> a store is loaded before iterating over the same list again to route
>> each message
>>
>> I find that a little ugly and it leaks aspects of the rather
>> unpleasant flow-to-disk 'feature' to the exchange class.
>
>
> ack, however we have bit of an issue here which we need to deal with.
>
> The basic case is, if we don't have a store module loaded, then the
> flow-to--disk limits will be applied as reject. However if we can't flow
> a message to disk when the store is loaded then we would just not flow
> it to disk. This is totally inconsistent.

I'm not sure I agree. Flow to disk is not supported on a non-durable 
queue; if you ask for it you will instead get a reject policy. That will 
be in effect for every message enqueued on that queue, regardless of the 
routing.

The case where you have a message enqueued on multiple queues and don't 
want to release it from memory is an entirely different case. This case 
is dependent on the number of other matching bindings for the published 
message.

> We also have the issue that
> our publish to multiple queues is not atomic outside a txn.

Checking that all queues are durable beforehand does nothing to change 
that though.

> That we need
> to debate if we want to change that, but I don't see any way to resolve
> the former without checking the queue state prior to routing. Any other
> ideas welcome.

As above I'm not convinced that any option is more 'consistent' than any 
other. The root issue is where the 'policy' on one queue could affect 
other queues with matching bindings and how to deal with that.

The real solution to that is a bigger job, but the most urgent 
manifestation of this is where releasing the content of the shared 
message due to a policy on one queue prevents that message being 
delivered on another.

My view would be that we simply don't release the content if we 
determine that not all the queues the message is enqueued on will be 
able to handle that. I think that will result in a cleaner short term 
fix for the immediate problem.

> Some good observations - nice review, however I don't think we can
> consider releasing trunk until this is cleaned up.
>
> There are a few things that the patch does clean up:
> - the massive code duplication in exchange route
> - the consolidation of all the release state into one manager.
>
> I do however agree that we might want to find a way to encapsulate the
> release action into Persistable message thus removing the release from
> SemanticState, which will also correct the txn case.
>
> I would suggest that we correct the signatures on the route re-factor
> part, the setting of store ptr & move the release logic all to the
> manager & persistableMessage and we see what the patch looks like.
>
>
> other notes:
>
> Issues that need to be covered are:
> - delaying release content till after processing queueEvents
> - delaying release of content until all messages have been enqueued
> - making behaviours consistent when a message can be flowed to disk
> - stop using stage() for flow to disk messages
> - correct ftd txn behaviour
> - resolving the ability to be able to read directly on write
>
> I think we can track this with one JIRA, as I think the issues need to
> be worked together to be able to get a clean refactor which I believe is
> the goal.

I agree a clean set of changes is the goal, living with those design 
defects we cannot remedy immediately but trying hard not to make things 
any more incoherent.

I don't mind how many Jiras we use to track the issues. I have created 
two already, but don't mind if those get augmented or replaced, 
providing we make the problems being solved clear.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Carl Trieloff <cc...@redhat.com>.
Gordon Sim wrote:
> On 09/11/2009 02:33 PM, kpvdr@apache.org wrote:
>> Author: kpvdr
>> Date: Fri Sep 11 13:33:42 2009
>> New Revision: 813825
>>
>> URL: http://svn.apache.org/viewvc?rev=813825&view=rev
>> Log:
>> Joint checkin with cctrieloff. Refactor of exchange routing so that 
>> multi-queue policy differences may be resolved and allow for correct 
>> multi-queue flow-to-disk behavior. Different queues may have 
>> differing policies and persistence properties - these were previously 
>> being neglected. New c++ test added.
>
> I'm not very keen on some of these changes. While the original code is 
> already unpleasant, this allows the bad smells to leak further into 
> the codebase.
>
> On a separate point, though its great to get whitespace fixed up, when 
> a commit like this one is contains >60% whitespace changes mixed up 
> with various real semantic changes it becomes much harder to review.
>
> However, back to the more important issue, the actual changes and the 
> specific things I dislike:
>
> * setting the store on a message in Queue::push()
>
> This means that if a message previously pushed on to a durable queue 
> then gets pushed on to a transient queue, the store pointer will be 
> set to null.
>
> Though that case could be fixed by a simple check this seems like the 
> wrong place to be associating the message with a store. In fact the 
> same line has also been added to the MessageBuilder (which I think is 
> a reasonable change), so its not clear why its called again here.
>

Agree -- passing it on release was worse. We should reason through if we 
could pass it only in messageBuilder.  Thoughts on that?


> (And though checking whether all queues are persistent before routing 
> may mean that in the common cases overwriting the store causes no 
> harm, what about the case where a queue is deleted and its messages 
> are routed through an alternate exchange?).

Don't know. Are you able to suggest a test for this case, so that I can 
understand what would be failing


>
> * iterating through all matching bindings to check that the queues 
> have a persistence id and taking that to mean they are persistent and 
> a store is loaded before iterating over the same list again to route 
> each message
>
> I find that a little ugly and it leaks aspects of the rather 
> unpleasant flow-to-disk 'feature' to the exchange class.


ack, however we have bit of an issue here which we need to deal with.

The basic case is, if we don't have a store module loaded, then the 
flow-to--disk limits will be applied as reject. However if we can't flow 
a message to disk when the store is loaded then we would just not flow 
it to disk. This is totally inconsistent. We also have the issue that 
our publish to multiple queues is not atomic outside a txn. That we need 
to debate if we want to change that, but I don't see any way to resolve 
the former without checking the queue state prior to routing. Any other 
ideas welcome.



>
> * using qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr as 
> the type of the parameter to the new doRoute method on Exchange.
>
> It would be far better to just use a const reference to a 
> Binding::vector. The CopyOnWriteArray logic is irrelevant to the 
> method in question, all that is being used is that typedef which just 
> makes someone reading the code work harder to figure out it is 
> irrelevant.
>
> That is particularly the case in the Topic exchange for which the new 
> typdef qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr is added 
> to CopyOnWriteArray for no apparent reason and used in this one place 
> (again when none of the CopyOnWriteArray logic is being used).

yes, binding vector would be better (CopyOnWriteArray is not used so no 
point having it in the signature)

>
> * perpetuating the unclear division of role between PersistableMessage 
> and Message itself; the logic for releasing and controlling the 
> release seems split between these two in what seems like an arbitrary 
> manner.
>
> Note also that with this change transactionally published messages 
> will never be released from memory as the call to do so (if required) 
> in SemanticState will happen before any request has been made to 
> release the content.
>
> The code for this aspect clearly has a number of problems which we 
> need to fix - creating Jiras for these would I think be valuable. I 
> think we need to try hard to get the cleanest possible set of changes 
> though, and prevent the already incoherent design from deteriorating 
> even further.

Some good observations - nice review, however I don't think we can 
consider releasing trunk until this is cleaned up.

There are a few things that the patch does clean up:
 - the massive code duplication in exchange route
 - the consolidation of all the release state into one manager.

I do however agree that we might want to find a way to encapsulate the 
release action into Persistable message thus removing the release from  
SemanticState, which will also correct the txn case.

I would suggest that we correct the signatures on the route re-factor 
part, the setting of store ptr & move the release logic all to the 
manager & persistableMessage and we see what the patch looks like.


other notes:

Issues that need to be covered are:
 - delaying release content till after processing queueEvents
 - delaying release of content until all messages have been enqueued
 - making behaviours consistent when a message can be flowed to disk
 - stop using stage() for flow to disk messages
 - correct ftd txn behaviour
 - resolving the ability to be able to read directly on write

I think we can track this with one JIRA, as I think the issues need to 
be worked together to be able to get a clean refactor which I believe is 
the goal.

my 2 cents.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Gordon Sim <gs...@redhat.com>.
On 09/14/2009 04:25 PM, Kim van der Riet wrote:
> On Mon, 2009-09-14 at 13:46 +0100, Gordon Sim wrote:
>> On 09/11/2009 02:33 PM, kpvdr@apache.org wrote:
>>> Author: kpvdr
>>> Date: Fri Sep 11 13:33:42 2009
>>> New Revision: 813825
>>>
>>> URL: http://svn.apache.org/viewvc?rev=813825&view=rev
>>> Log:
>>> Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
>>
>> I'm not very keen on some of these changes. While the original code is
>> already unpleasant, this allows the bad smells to leak further into the
>> codebase.
>
> I have rolled this checkin back out in r814692.

That was more drastic than I was suggesting! Fyi, I have created two 
Jiras for the specific issues I'm aware of:

   https://issues.apache.org/jira/browse/QPID-2101
   https://issues.apache.org/jira/browse/QPID-2102

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r813825 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Posted by Kim van der Riet <ki...@redhat.com>.
On Mon, 2009-09-14 at 13:46 +0100, Gordon Sim wrote:
> On 09/11/2009 02:33 PM, kpvdr@apache.org wrote:
> > Author: kpvdr
> > Date: Fri Sep 11 13:33:42 2009
> > New Revision: 813825
> >
> > URL: http://svn.apache.org/viewvc?rev=813825&view=rev
> > Log:
> > Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
> 
> I'm not very keen on some of these changes. While the original code is 
> already unpleasant, this allows the bad smells to leak further into the 
> codebase.

I have rolled this checkin back out in r814692.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org