You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [9/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,7 +31,10 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/FifoDistributor.h"
+#include "qpid/broker/MessageGroupManager.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -41,6 +44,7 @@
 #include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
 
@@ -64,7 +68,7 @@ using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 
-namespace 
+namespace
 {
 const std::string qpidMaxSize("qpid.max_size");
 const std::string qpidMaxCount("qpid.max_count");
@@ -86,16 +90,16 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
-Queue::Queue(const string& _name, bool _autodelete, 
+Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
              Manageable* parent,
              Broker* b) :
 
-    name(_name), 
+    name(_name),
     autodelete(_autodelete),
     store(_store),
-    owner(_owner), 
+    owner(_owner),
     consumerCount(0),
     exclusive(0),
     noLocal(false),
@@ -110,7 +114,8 @@ Queue::Queue(const string& _name, bool _
     broker(b),
     deleted(false),
     barrier(*this),
-    autoDeleteTimeout(0)
+    autoDeleteTimeout(0),
+    allocator(new FifoDistributor( *messages ))
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -163,13 +168,8 @@ void Queue::deliver(boost::intrusive_ptr
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
-        // if no store then mark as enqueued
-        if (!enqueue(0, msg)){
-            push(msg);
-            msg->enqueueComplete();
-        }else {
-            push(msg);
-        }
+        enqueue(0, msg);
+        push(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -183,11 +183,10 @@ void Queue::recover(boost::intrusive_ptr
     if (policy.get()) policy->recoverEnqueued(msg);
 
     push(msg, true);
-    if (store){ 
+    if (store){
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
-        msg->addToSyncList(shared_from_this(), store); 
+        msg->addToSyncList(shared_from_this(), store);
     }
-    msg->enqueueComplete(); // mark the message as enqueued
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -211,14 +210,13 @@ void Queue::process(boost::intrusive_ptr
 void Queue::requeue(const QueuedMessage& msg){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    {    
+    {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
-        msg.payload->enqueueComplete(); // mark the message as enqueued
         messages->reinsert(msg);
         listeners.populate(copy);
 
-        // for persistLastNode - don't force a message twice to disk, but force it if no force before 
+        // for persistLastNode - don't force a message twice to disk, but force it if no force before
         if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
             msg.payload->forcePersistent();
             if (msg.payload->isForcedPersistent() ){
@@ -226,16 +224,17 @@ void Queue::requeue(const QueuedMessage&
             	enqueue(0, payload);
             }
         }
+        observeRequeue(msg, locker);
     }
     copy.notify();
 }
 
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (messages->remove(position, message)) {
+    if (acquire(position, message, locker)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -244,9 +243,24 @@ bool Queue::acquireMessageAt(const Seque
     }
 }
 
-bool Queue::acquire(const QueuedMessage& msg) {
-    QueuedMessage copy = msg;
-    return acquireMessageAt(msg.position, copy);
+bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
+{
+    Mutex::ScopedLock locker(messageLock);
+    assertClusterSafe();
+    QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
+
+    if (!allocator->allocate( consumer, msg )) {
+        QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
+        return false;
+    }
+
+    QueuedMessage copy(msg);
+    if (acquire( msg.position, copy, locker)) {
+        QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
+        return true;
+    }
+    QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
+    return false;
 }
 
 void Queue::notifyListener()
@@ -262,7 +276,7 @@ void Queue::notifyListener()
     set.notify();
 }
 
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     checkNotDeleted();
     if (c->preAcquires()) {
@@ -274,52 +288,71 @@ bool Queue::getNextMessage(QueuedMessage
           case NO_MESSAGES:
           default:
             return false;
-        }        
+        }
     } else {
         return browseNextMessage(m, c);
     }
 }
 
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) { 
-            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+        QueuedMessage msg;
+
+        if (!allocator->nextConsumableMessage(c, msg)) { // no next available
+            QPID_LOG(debug, "No messages available to dispatch to consumer " <<
+                     c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
-        } else {
-            QueuedMessage msg = messages->front();
-            if (msg.payload->hasExpired()) {
-                QPID_LOG(debug, "Message expired from queue '" << name << "'");
-                popAndDequeue();
-                continue;
-            }
+        }
 
-            if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {            
-                    m = msg;
-                    pop();
-                    return CONSUMED;
-                } else {
-                    //message(s) are available but consumer hasn't got enough credit
-                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
-                    return CANT_CONSUME;
-                }
+        if (msg.payload->hasExpired()) {
+            QPID_LOG(debug, "Message expired from queue '" << name << "'");
+            c->position = msg.position;
+            acquire( msg.position, msg, locker);
+            dequeue( 0, msg );
+            continue;
+        }
+
+        // a message is available for this consumer - can the consumer use it?
+
+        if (c->filter(msg.payload)) {
+            if (c->accept(msg.payload)) {
+                bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
+                (void) ok; assert(ok);
+                ok = acquire( msg.position, msg, locker);
+                (void) ok; assert(ok);
+                m = msg;
+                c->position = m.position;
+                return CONSUMED;
             } else {
-                //consumer will never want this message
-                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+                //message(s) are available but consumer hasn't got enough credit
+                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
                 return CANT_CONSUME;
-            } 
+            }
+        } else {
+            //consumer will never want this message
+            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+            c->position = msg.position;
+            return CANT_CONSUME;
         }
     }
 }
 
-
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
-    QueuedMessage msg(this);
-    while (seek(msg, c)) {
+    while (true) {
+        Mutex::ScopedLock locker(messageLock);
+        QueuedMessage msg;
+
+        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+            QPID_LOG(debug, "No browsable messages available for consumer " <<
+                     c->getName() << " on queue '" << name << "'");
+            listeners.addListener(c);
+            return false;
+        }
+
         if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
             if (c->accept(msg.payload)) {
                 //consumer wants the message
@@ -333,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMess
             }
         } else {
             //consumer will never want this message, continue seeking
-            c->position = msg.position;
             QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+            c->position = msg.position;
         }
     }
     return false;
@@ -364,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_pt
     }
 }
 
-// Find the next message 
-bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
-    Mutex::ScopedLock locker(messageLock);
-    if (messages->next(c->position, msg)) {
-        return true;
-    } else {
-        listeners.addListener(c);
-        return false;
-    }
-}
-
-QueuedMessage Queue::find(SequenceNumber pos) const {
+bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
 
     Mutex::ScopedLock locker(messageLock);
-    QueuedMessage msg;
-    messages->find(pos, msg);
-    return msg;
+    if (messages->find(pos, msg))
+        return true;
+    return false;
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     assertClusterSafe();
-    Mutex::ScopedLock locker(consumerLock);
-    if(exclusive) {
-        throw ResourceLockedException(
-            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-    } else if(requestExclusive) {
-        if(consumerCount) {
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        if(exclusive) {
             throw ResourceLockedException(
-                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-        } else {
-            exclusive = c->getSession();
+                                          QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+        } else if(requestExclusive) {
+            if(consumerCount) {
+                throw ResourceLockedException(
+                                              QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+            } else {
+                exclusive = c->getSession();
+            }
+        }
+        consumerCount++;
+        if (mgmtObject != 0)
+            mgmtObject->inc_consumerCount ();
+        //reset auto deletion timer if necessary
+        if (autoDeleteTimeout && autoDeleteTask) {
+            autoDeleteTask->cancel();
         }
     }
-    consumerCount++;
-    if (mgmtObject != 0)
-        mgmtObject->inc_consumerCount ();
-    //reset auto deletion timer if necessary
-    if (autoDeleteTimeout && autoDeleteTask) {
-        autoDeleteTask->cancel();
+    Mutex::ScopedLock locker(messageLock);
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerAdded(*c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+        }
     }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
-    Mutex::ScopedLock locker(consumerLock);
-    consumerCount--;
-    if(exclusive) exclusive = 0;
-    if (mgmtObject != 0)
-        mgmtObject->dec_consumerCount ();
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        consumerCount--;
+        if(exclusive) exclusive = 0;
+        if (mgmtObject != 0)
+            mgmtObject->dec_consumerCount ();
+    }
+    Mutex::ScopedLock locker(messageLock);
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerRemoved(*c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+        }
+    }
 }
 
 QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    messages->pop(msg);
+    if (messages->pop(msg))
+        observeAcquire(msg, locker);
     return msg;
 }
 
@@ -432,22 +475,135 @@ bool collect_if_expired(std::deque<Queue
     }
 }
 
-void Queue::purgeExpired()
+/**
+ *@param lapse: time since the last purgeExpired
+ */
+void Queue::purgeExpired(qpid::sys::Duration lapse)
 {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
-    //attempt is less than one per second.  
-
-    if (dequeueTracker.sampleRatePerSecond() < 1) {
+    //attempt is less than one per second.
+    int count = dequeueSincePurge.get();
+    dequeueSincePurge -= count;
+    int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
+    if (seconds == 0 || count / seconds < 1) {
         std::deque<QueuedMessage> expired;
         {
             Mutex::ScopedLock locker(messageLock);
-            messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
+            messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
+        }
+
+        for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+             i != expired.end(); ++i) {
+            {
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquire(*i, locker);
+            }
+            dequeue( 0, *i );
         }
-        for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 }
 
+
+namespace {
+    // for use with purge/move below - collect messages that match a given filter
+    //
+    class MessageFilter
+    {
+    public:
+        static const std::string typeKey;
+        static const std::string paramsKey;
+        static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+        virtual bool match( const QueuedMessage& ) const { return true; }
+        virtual ~MessageFilter() {}
+    protected:
+        MessageFilter() {};
+    };
+    const std::string MessageFilter::typeKey("filter_type");
+    const std::string MessageFilter::paramsKey("filter_params");
+
+    // filter by message header string value exact match
+    class HeaderMatchFilter : public MessageFilter
+    {
+    public:
+        /* Config:
+           { 'filter_type' : 'header_match_str',
+             'filter_params' : { 'header_key' : "<header name>",
+                                 'header_value' : "<value to match>"
+                               }
+           }
+        */
+        static const std::string typeKey;
+        static const std::string headerKey;
+        static const std::string valueKey;
+        HeaderMatchFilter( const std::string& _header, const std::string& _value )
+            : MessageFilter (), header(_header), value(_value) {}
+        bool match( const QueuedMessage& msg ) const
+        {
+            const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+            if (!headers) return false;
+            FieldTable::ValuePtr h = headers->get(header);
+            if (!h || !h->convertsTo<std::string>()) return false;
+            return h->get<std::string>() == value;
+        }
+    private:
+        const std::string header;
+        const std::string value;
+    };
+    const std::string HeaderMatchFilter::typeKey("header_match_str");
+    const std::string HeaderMatchFilter::headerKey("header_key");
+    const std::string HeaderMatchFilter::valueKey("header_value");
+
+    // factory to create correct filter based on map
+    MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+    {
+        using namespace qpid::types;
+        if (filter && !filter->empty()) {
+            Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+            if (i != filter->end()) {
+
+                if (i->second.asString() == HeaderMatchFilter::typeKey) {
+                    Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+                    if (p != filter->end() && p->second.getType() == VAR_MAP) {
+                        Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+                        Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+                        if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+                            std::string headerKey(k->second.asString());
+                            std::string value(v->second.asString());
+                            QPID_LOG(debug, "Message filtering by header value configured.  key: " << headerKey << " value: " << value );
+                            return new HeaderMatchFilter( headerKey, value );
+                        }
+                    }
+                }
+            }
+            QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+        }
+        return new MessageFilter();
+    }
+
+    // used by removeIf() to collect all messages matching a filter, maximum match count is
+    // optional.
+    struct Collector {
+        const uint32_t maxMatches;
+        MessageFilter& filter;
+        std::deque<QueuedMessage> matches;
+        Collector(MessageFilter& filter, uint32_t max)
+            : maxMatches(max), filter(filter) {}
+        bool operator() (QueuedMessage& qm)
+        {
+            if (maxMatches == 0 || matches.size() < maxMatches) {
+                if (filter.match( qm )) {
+                    matches.push_back(qm);
+                    return true;
+                }
+            }
+            return false;
+        }
+    };
+
+} // end namespace
+
+
 /**
  * purge - for purging all or some messages on a queue
  *         depending on the purge_request
@@ -459,63 +615,77 @@ void Queue::purgeExpired()
  * The dest exchange may be supplied to re-route messages through the exchange.
  * It is safe to re-route messages such that they arrive back on the same queue,
  * even if the queue is ordered by priority.
+ *
+ * An optional filter can be supplied that will be applied against each message.  The
+ * message is purged only if the filter matches.  See MessageDistributor for more detail.
  */
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+                      const qpid::types::Variant::Map *filter)
 {
-    Mutex::ScopedLock locker(messageLock);
-    uint32_t purge_count = purge_request; // only comes into play if  >0 
-    std::deque<DeliverableMessage> rerouteQueue;
+    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+    Collector c(*mf.get(), purge_request);
 
-    uint32_t count = 0;
-    // Either purge them all or just the some (purge_count) while the queue isn't empty.
-    while((!purge_request || purge_count--) && !messages->empty()) {
+    Mutex::ScopedLock locker(messageLock);
+    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+         qmsg != c.matches.end(); ++qmsg) {
+        // Update observers and message state:
+        observeAcquire(*qmsg, locker);
+        dequeue(0, *qmsg);
+        // now reroute if necessary
         if (dest.get()) {
-            //
-            // If there is a destination exchange, stage the messages onto a reroute queue
-            // so they don't wind up getting purged more than once.
-            //
-            DeliverableMessage msg(messages->front().payload);
-            rerouteQueue.push_back(msg);
+            assert(qmsg->payload);
+            DeliverableMessage dmsg(qmsg->payload);
+            dest->routeWithAlternate(dmsg);
         }
-        popAndDequeue();
-        count++;
     }
-
-    //
-    // Re-route purged messages into the destination exchange.  Note that there's no need
-    // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
-    //
-    while (!rerouteQueue.empty()) {
-        DeliverableMessage msg(rerouteQueue.front());
-        rerouteQueue.pop_front();
-        dest->route(msg, msg.getMessage().getRoutingKey(),
-                    msg.getMessage().getApplicationHeaders());
-    }
-
-    return count;
+    return c.matches.size();
 }
 
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
+                     const qpid::types::Variant::Map *filter)
+{
+    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+    Collector c(*mf.get(), qty);
+
     Mutex::ScopedLock locker(messageLock);
-    uint32_t move_count = qty; // only comes into play if  qty >0 
-    uint32_t count = 0; // count how many were moved for returning
+    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
 
-    while((!qty || move_count--) && !messages->empty()) {
-        QueuedMessage qmsg = messages->front();
-        boost::intrusive_ptr<Message> msg = qmsg.payload;
-        destq->deliver(msg); // deliver message to the destination queue
-        pop();
-        dequeue(0, qmsg);
-        count++;
+    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+         qmsg != c.matches.end(); ++qmsg) {
+        // Update observers and message state:
+        observeAcquire(*qmsg, locker);
+        dequeue(0, *qmsg);
+        // and move to destination Queue.
+        assert(qmsg->payload);
+        destq->deliver(qmsg->payload);
     }
-    return count;
+    return c.matches.size();
 }
 
-void Queue::pop()
+/** Acquire the front (oldest) message from the in-memory queue.
+ * assumes messageLock held by caller
+ */
+void Queue::pop(const Mutex::ScopedLock& locker)
 {
     assertClusterSafe();
-    messages->pop();
-    ++dequeueTracker;
+    QueuedMessage msg;
+    if (messages->pop(msg)) {
+        observeAcquire(msg, locker);
+        ++dequeueSincePurge;
+    }
+}
+
+/** Acquire the message at the given position, return true and msg if acquire succeeds */
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+                    const Mutex::ScopedLock& locker)
+{
+    if (messages->remove(position, msg)) {
+        observeAcquire(msg, locker);
+        ++dequeueSincePurge;
+        return true;
+    }
+    return false;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -524,13 +694,15 @@ void Queue::push(boost::intrusive_ptr<Me
     QueuedMessage removed;
     bool dequeueRequired = false;
     {
-        Mutex::ScopedLock locker(messageLock);   
+        Mutex::ScopedLock locker(messageLock);
         QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-         
+        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
+
         dequeueRequired = messages->push(qm, removed);
+        if (dequeueRequired)
+            observeAcquire(removed, locker);
         listeners.populate(copy);
-        enqueued(qm);
+        observeEnqueue(qm, locker);
     }
     copy.notify();
     if (dequeueRequired) {
@@ -546,7 +718,7 @@ void Queue::push(boost::intrusive_ptr<Me
 
 void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
 {
-    if (message.payload->isEnqueueComplete()) (*result)++;
+    if (message.payload->isIngressComplete()) (*result)++;
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -606,7 +778,7 @@ void Queue::setLastNodeFailure()
 }
 
 
-// return true if store exists, 
+// return true if store exists,
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
 {
     ScopedUse u(barrier);
@@ -620,24 +792,21 @@ bool Queue::enqueue(TransactionContext* 
             policy->getPendingDequeues(dequeues);
         }
         //depending on policy, may have some dequeues that need to performed without holding the lock
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
-       
+
     if (traceId.size()) {
-        //copy on write: take deep copy of message before modifying it
-        //as the frames may already be available for delivery on other
-        //threads
-        boost::intrusive_ptr<Message> copy(new Message(*msg));
-        msg = copy;
         msg->addTraceId(traceId);
     }
 
     if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
-        msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+        // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
+        // when it considers the message stored.
+        msg->enqueueAsync(shared_from_this(), store);
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
@@ -654,10 +823,10 @@ bool Queue::enqueue(TransactionContext* 
 void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);       
+    if (policy.get()) policy->enqueueAborted(msg);
 }
 
-// return true if store exists, 
+// return true if store exists,
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
     ScopedUse u(barrier);
@@ -666,8 +835,8 @@ bool Queue::dequeue(TransactionContext* 
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
-        if (!ctxt) { 
-            dequeued(msg);
+        if (!ctxt) {
+            observeDequeue(msg, locker);
         }
     }
     // This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -687,7 +856,7 @@ bool Queue::dequeue(TransactionContext* 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    dequeued(msg);    
+    observeDequeue(msg, locker);
     if (mgmtObject != 0) {
         mgmtObject->inc_msgTxnDequeues();
         mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -695,21 +864,23 @@ void Queue::dequeueCommitted(const Queue
 }
 
 /**
- * Removes a message from the in-memory delivery queue as well
- * dequeing it from the logical (and persistent if applicable) queue
+ * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
+ * it from the logical (and persistent if applicable) queue
  */
-void Queue::popAndDequeue()
+void Queue::popAndDequeue(const Mutex::ScopedLock& held)
 {
-    QueuedMessage msg = messages->front();
-    pop();
-    dequeue(0, msg);
+    if (!messages->empty()) {
+        QueuedMessage msg = messages->front();
+        pop(held);
+        dequeue(0, msg);
+    }
 }
 
 /**
  * Updates policy and management when a message has been dequeued,
  * expects messageLock to be held
  */
-void Queue::dequeued(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
 {
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
@@ -722,6 +893,33 @@ void Queue::dequeued(const QueuedMessage
     }
 }
 
+/** updates queue observers when a message has become unavailable for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->acquired(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
+/** updates queue observers when a message has become re-available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->requeued(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+        }
+    }
+}
 
 void Queue::create(const FieldTable& _settings)
 {
@@ -729,7 +927,7 @@ void Queue::create(const FieldTable& _se
     if (store) {
         store->create(*this, _settings);
     }
-    configure(_settings);
+    configureImpl(_settings);
 }
 
 
@@ -742,8 +940,8 @@ int getIntegerSetting(const qpid::framin
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try { 
-            return boost::lexical_cast<int>(s); 
+        try {
+            return boost::lexical_cast<int>(s);
         } catch(const boost::bad_lexical_cast&) {
             QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
             return 0;
@@ -754,15 +952,45 @@ int getIntegerSetting(const qpid::framin
     }
 }
 
-void Queue::configure(const FieldTable& _settings, bool recovering)
+bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
 {
+    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+    if (!v) {
+        return false;
+    } else if (v->convertsTo<int>()) {
+        return v->get<int>() != 0;
+    } else if (v->convertsTo<std::string>()){
+        std::string s = v->get<std::string>();
+        if (s == "True")  return true;
+        if (s == "true")  return true;
+        if (s == "False") return false;
+        if (s == "false") return false;
+        try {
+            return boost::lexical_cast<bool>(s);
+        } catch(const boost::bad_lexical_cast&) {
+            QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
+            return false;
+        }
+    } else {
+        QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
+        return false;
+    }
+}
+
+void Queue::configure(const FieldTable& _settings)
+{
+    settings = _settings;
+    configureImpl(settings);
+}
 
+void Queue::configureImpl(const FieldTable& _settings)
+{
     eventMode = _settings.getAsInt(qpidQueueEventGeneration);
     if (eventMode && broker) {
         broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
     }
 
-    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
+    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
         (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -776,32 +1004,43 @@ void Queue::configure(const FieldTable& 
         setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
     }
     if (broker && broker->getManagementAgent()) {
-        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
     }
 
     //set this regardless of owner to allow use of no-local with exclusive consumers also
-    noLocal = _settings.get(qpidNoLocal);
+    noLocal = getBoolSetting(_settings, qpidNoLocal);
     QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
     std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
     if (lvqKey.size()) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
         messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
-    } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
+    } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
-    } else if (_settings.get(qpidLastValueQueue)) {
+        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
+    } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
     } else {
         std::auto_ptr<Messages> m = Fairshare::create(_settings);
         if (m.get()) {
             messages = m;
+            allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
+        } else { // default (FIFO) queue type
+            // override default message allocator if message groups configured.
+            boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
+            if (mgm) {
+                allocator = mgm;
+                addObserver(mgm);
+            }
         }
     }
-    
-    persistLastNode= _settings.get(qpidPersistLastNode);
+
+    persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
     if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
     traceId = _settings.getAsString(qpidTraceIdentity);
@@ -809,32 +1048,32 @@ void Queue::configure(const FieldTable& 
     if (excludeList.size()) {
         split(traceExclude, excludeList, ", ");
     }
-    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
+    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
     autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
-    if (autoDeleteTimeout) 
-        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
+    if (autoDeleteTimeout)
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
 
-    if (mgmtObject != 0)
+    if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+    }
 
-    if ( isDurable() && ! getPersistenceId() && ! recovering )
-      store->create(*this, _settings);
+    QueueFlowLimit::observe(*this, _settings);
 }
 
-void Queue::destroy()
+void Queue::destroyed()
 {
+    unbind(broker->getExchanges());
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages->empty()){
             DeliverableMessage msg(messages->front().payload);
-            alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
-                                     msg.getMessage().getApplicationHeaders());
-            popAndDequeue();
+            alternateExchange->routeWithAlternate(msg);
+            popAndDequeue(locker);
         }
         alternateExchange->decAlternateUsers();
     }
@@ -846,6 +1085,7 @@ void Queue::destroy()
         store = 0;//ensure we make no more calls to the store for this queue
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
+    notifyDeleted();
 }
 
 void Queue::notifyDeleted()
@@ -865,9 +1105,9 @@ void Queue::bound(const string& exchange
     bindings.add(exchange, key, args);
 }
 
-void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+void Queue::unbind(ExchangeRegistry& exchanges)
 {
-    bindings.unbind(exchanges, shared_ref);
+    bindings.unbind(exchanges, shared_from_this());
 }
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -880,9 +1120,9 @@ const QueuePolicy* Queue::getPolicy()
     return policy.get();
 }
 
-uint64_t Queue::getPersistenceId() const 
-{ 
-    return persistenceId; 
+uint64_t Queue::getPersistenceId() const
+{
+    return persistenceId;
 }
 
 void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -896,11 +1136,11 @@ void Queue::setPersistenceId(uint64_t _p
     persistenceId = _persistenceId;
 }
 
-void Queue::encode(Buffer& buffer) const 
+void Queue::encode(Buffer& buffer) const
 {
     buffer.putShortString(name);
     buffer.put(settings);
-    if (policy.get()) { 
+    if (policy.get()) {
         buffer.put(*policy);
     }
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -914,13 +1154,14 @@ uint32_t Queue::encodedSize() const
         + (policy.get() ? (*policy).encodedSize() : 0);
 }
 
-Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool recovering )
+Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
 {
     string name;
     buffer.getShortString(name);
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
-    buffer.get(result.first->settings);
-    result.first->configure(result.first->settings, recovering );
+    FieldTable settings;
+    buffer.get(settings);
+    boost::shared_ptr<Exchange> alternate;
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
     if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
         buffer.get ( *(result.first->policy) );
     }
@@ -952,11 +1193,10 @@ boost::shared_ptr<Exchange> Queue::getAl
 
 void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
-    if (broker.getQueues().destroyIf(queue->getName(), 
+    if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->unbind(broker.getExchanges(), queue);
-        queue->destroy();
+        queue->destroyed();
     }
 }
 
@@ -965,7 +1205,7 @@ struct AutoDeleteTask : qpid::sys::Timer
     Broker& broker;
     Queue::shared_ptr queue;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
         : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
 
     void fire()
@@ -983,27 +1223,27 @@ void Queue::tryAutoDelete(Broker& broker
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
-        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
         tryAutoDeleteImpl(broker, queue);
     }
 }
 
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
-{ 
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return o == owner; 
+    return o == owner;
 }
 
-void Queue::releaseExclusiveOwnership() 
-{ 
+void Queue::releaseExclusiveOwnership()
+{
     Mutex::ScopedLock locker(ownershipLock);
-    owner = 0; 
+    owner = 0;
 }
 
-bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
-{ 
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
     //reset auto deletion timer if necessary
     if (autoDeleteTimeout && autoDeleteTask) {
         autoDeleteTask->cancel();
@@ -1012,25 +1252,25 @@ bool Queue::setExclusiveOwner(const Owne
     if (owner) {
         return false;
     } else {
-        owner = o; 
+        owner = o;
         return true;
     }
 }
 
-bool Queue::hasExclusiveOwner() const 
-{ 
+bool Queue::hasExclusiveOwner() const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return owner != 0; 
+    return owner != 0;
 }
 
-bool Queue::hasExclusiveConsumer() const 
-{ 
-    return exclusive; 
+bool Queue::hasExclusiveConsumer() const
+{
+    return exclusive;
 }
 
 void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
-    if (externalQueueStore!=inst && externalQueueStore) 
-        delete externalQueueStore; 
+    if (externalQueueStore!=inst && externalQueueStore)
+        delete externalQueueStore;
     externalQueueStore = inst;
 
     if (inst) {
@@ -1055,7 +1295,7 @@ Manageable::status_t Queue::ManagementMe
     case _qmf::Queue::METHOD_PURGE :
         {
             _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
-            purge(purgeArgs.i_request);
+            purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
             status = Manageable::STATUS_OK;
         }
         break;
@@ -1076,7 +1316,7 @@ Manageable::status_t Queue::ManagementMe
                 }
             }
 
-            purge(rerouteArgs.i_request, dest);
+            purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
             status = Manageable::STATUS_OK;
         }
         break;
@@ -1085,6 +1325,14 @@ Manageable::status_t Queue::ManagementMe
     return status;
 }
 
+
+void Queue::query(qpid::types::Variant::Map& results) const
+{
+    Mutex::ScopedLock locker(messageLock);
+    /** @todo add any interesting queue state into results */
+    if (allocator) allocator->query(results);
+}
+
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
     sequence = n;
@@ -1119,7 +1367,10 @@ void Queue::insertSequenceNumbers(const 
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
 
-void Queue::enqueued(const QueuedMessage& m)
+/** updates queue observers and state when a message has become available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1142,7 +1393,8 @@ void Queue::updateEnqueued(const QueuedM
         if (policy.get()) {
             policy->recoverEnqueued(payload);
         }
-        enqueued(m);
+        Mutex::ScopedLock locker(messageLock);
+        observeEnqueue(m, locker);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1166,6 +1418,7 @@ void Queue::checkNotDeleted()
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
 {
+    Mutex::ScopedLock locker(messageLock);
     observers.insert(observer);
 }
 
@@ -1175,6 +1428,32 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
+
+bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+                 const qpid::framing::FieldTable& arguments)
+{
+    if (exchange->bind(shared_from_this(), key, &arguments)) {
+        bound(exchange->getName(), key, arguments);
+        if (exchange->isDurable() && isDurable()) {
+            store->bind(*exchange, *this, key, arguments);
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
+
+const Broker* Queue::getBroker()
+{
+    return broker;
+}
+
+void Queue::setDequeueSincePurge(uint32_t value) {
+    dequeueSincePurge = value;
+}
+
+
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
 #include "qpid/broker/QueueObserver.h"
-#include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
@@ -59,7 +59,7 @@ class MessageStore;
 class QueueEvents;
 class QueueRegistry;
 class TransactionContext;
-class Exchange;
+class MessageDistributor;
 
 /**
  * The brokers representation of an amqp queue. Messages are
@@ -74,13 +74,13 @@ class Queue : public boost::enable_share
     {
         Queue& parent;
         uint count;
-                
+
         UsageBarrier(Queue&);
         bool acquire();
         void release();
         void destroy();
     };
-            
+
     struct ScopedUse
     {
         UsageBarrier& barrier;
@@ -88,7 +88,7 @@ class Queue : public boost::enable_share
         ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
-            
+
     typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
@@ -119,7 +119,7 @@ class Queue : public boost::enable_share
     boost::shared_ptr<Exchange> alternateExchange;
     framing::SequenceNumber sequence;
     qmf::org::apache::qpid::broker::Queue* mgmtObject;
-    RateTracker dequeueTracker;
+    sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
     int eventMode;
     Observers observers;
     bool insertSeqNo;
@@ -129,26 +129,36 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+    boost::shared_ptr<MessageDistributor> allocator;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
-    bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
-    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
-    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
-    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
     void notifyListener();
 
     void removeListener(Consumer::shared_ptr);
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-    void enqueued(const QueuedMessage& msg);
-    void dequeued(const QueuedMessage& msg);
-    void pop();
-    void popAndDequeue();
-    QueuedMessage getFront();
+    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+     * must be held by caller */
+    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+
+    /** modify the Queue's message container - assumes messageLock held */
+    void pop(const sys::Mutex::ScopedLock& held);           // acquire front msg
+    void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
+    // acquire message @ position, return true and set msg if acquire succeeds
+    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+                 const sys::Mutex::ScopedLock& held);
+
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
+    void configureImpl(const qpid::framing::FieldTable& settings);
 
     inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
     {
@@ -172,8 +182,9 @@ class Queue : public boost::enable_share
             }
         }
     }
-            
+
     void checkNotDeleted();
+    void notifyDeleted();
 
   public:
 
@@ -182,29 +193,50 @@ class Queue : public boost::enable_share
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
-                             bool autodelete = false, 
-                             MessageStore* const store = 0, 
+                             bool autodelete = false,
+                             MessageStore* const store = 0,
                              const OwnershipToken* const owner = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
     QPID_BROKER_EXTERN ~Queue();
 
+    /** allow the Consumer to consume or browse the next available message */
     QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
 
-    void create(const qpid::framing::FieldTable& settings);
+    /** allow the Consumer to acquire a message that it has browsed.
+     * @param msg - message to be acquired.
+     * @return false if message is no longer available for acquire.
+     */
+    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
 
-    // "recovering" means we are doing a MessageStore recovery.
-    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
-                                      bool recovering = false);
-    void destroy();
-    void notifyDeleted();
+    /**
+     * Used to configure a new queue and create a persistent record
+     * for it in store if required.
+     */
+    QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+
+    /**
+     * Used to reconfigure a recovered queue (does not create
+     * persistent record in store).
+     */
+    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
+    void destroyed();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
                                   const qpid::framing::FieldTable& args);
-    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
-                                   Queue::shared_ptr shared_ref);
+    //TODO: get unbind out of the public interface; only there for purposes of one unit test
+    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
+    /**
+     * Bind self to specified exchange, and record that binding for unbinding on delete.
+     */
+    bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+              const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
 
-    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+    /** Acquire the message at the given position if it is available for acquire.  Not to
+     * be used by clients, but used by the broker for queue management.
+     * @param message - set to the acquired message if true returned.
+     * @return true if the message has been acquired.
+     */
     QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
 
     /**
@@ -233,11 +265,14 @@ class Queue : public boost::enable_share
                                     bool exclusive = false);
     QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
 
-    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages 
-    QPID_BROKER_EXTERN void purgeExpired();
+    uint32_t purge(const uint32_t purge_request=0,  //defaults to all messages
+                   boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
+                   const ::qpid::types::Variant::Map *filter=0);
+    QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
 
     //move qty # of messages to destination Queue destq
-    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
+    uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
+                  const qpid::types::Variant::Map *filter=0);
 
     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
     QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -276,8 +311,8 @@ class Queue : public boost::enable_share
      * Inform queue of messages that were enqueued, have since
      * been acquired but not yet accepted or released (and
      * thus are still logically on the queue) - used in
-     * clustered broker.  
-     */ 
+     * clustered broker.
+     */
     void updateEnqueued(const QueuedMessage& msg);
 
     /**
@@ -288,14 +323,14 @@ class Queue : public boost::enable_share
      * accepted it).
      */
     bool isEnqueued(const QueuedMessage& msg);
-            
+
     /**
-     * Gets the next available message 
+     * Acquires the next available (oldest) message
      */
     QPID_BROKER_EXTERN QueuedMessage get();
 
-    /** Get the message at position pos */
-    QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+    /** Get the message at position pos, returns true if found and sets msg */
+    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
 
     const QueuePolicy* getPolicy();
 
@@ -309,8 +344,13 @@ class Queue : public boost::enable_share
     void encode(framing::Buffer& buffer) const;
     uint32_t encodedSize() const;
 
-    // "recovering" means we are doing a MessageStore recovery.
-    static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
+    /**
+     * Restores a queue from encoded data (used in recovery)
+     *
+     * Note: restored queue will be neither auto-deleted or have an
+     * exclusive owner
+     */
+    static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
     static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
 
     virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -319,6 +359,7 @@ class Queue : public boost::enable_share
     management::ManagementObject* GetManagementObject (void) const;
     management::Manageable::status_t
     ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+    void query(::qpid::types::Variant::Map&) const;
 
     /** Apply f to each Message on the queue. */
     template <class F> void eachMessage(F f) {
@@ -331,6 +372,11 @@ class Queue : public boost::enable_share
         bindings.eachBinding(f);
     }
 
+    /** Apply f to each Observer on the queue */
+    template <class F> void eachObserver(F f) {
+        std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
+    }
+
     /** Set the position sequence number  for the next message on the queue.
      * Must be >= the current sequence number.
      * Used by cluster to replicate queues.
@@ -358,6 +404,11 @@ class Queue : public boost::enable_share
     void recoverPrepared(boost::intrusive_ptr<Message>& msg);
 
     void flush();
+
+    const Broker* getBroker();
+
+    uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
+    void setDequeueSincePurge(uint32_t value);
 };
 }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
 namespace qpid {
 namespace broker {
 
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
 
 QueueCleaner::~QueueCleaner()
 {
@@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner()
 
 void QueueCleaner::start(qpid::sys::Duration p)
 {
+    period = p;
     task = new Task(*this, p);
-    timer.add(task);
+    timer->add(task);
 }
 
+void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
+    this->timer = timer;
+}
+
+
 QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
 
 void QueueCleaner::Task::fire()
@@ -65,9 +71,9 @@ void QueueCleaner::fired()
     std::vector<Queue::shared_ptr> copy;
     CollectQueues collect(&copy);
     queues.eachQueue(collect);
-    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
+    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
     task->setupNextFire();
-    timer.add(task);
+    timer->add(task);
 }
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,14 +35,15 @@ class QueueRegistry;
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
     QPID_BROKER_EXTERN ~QueueCleaner();
-    QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
+    QPID_BROKER_EXTERN void start(sys::Duration period);
+    QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
   private:
     class Task : public sys::TimerTask
     {
       public:
-        Task(QueueCleaner& parent, qpid::sys::Duration duration);
+        Task(QueueCleaner& parent, sys::Duration duration);
         void fire();
       private:
         QueueCleaner& parent;
@@ -50,7 +51,8 @@ class QueueCleaner
 
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
-    sys::Timer& timer;
+    sys::Timer* timer;
+    sys::Duration period;
 
     void fired();
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp Thu Oct 20 18:42:46 2011
@@ -129,6 +129,10 @@ class EventGenerator : public QueueObser
     {
         if (!enqueueOnly) manager.dequeued(m);
     }
+
+    void acquired(const QueuedMessage&) {};
+    void requeued(const QueuedMessage&) {};
+
   private:
     QueueEvents& manager;
     const bool enqueueOnly;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp Thu Oct 20 18:42:46 2011
@@ -26,19 +26,25 @@ namespace broker {
 
 void QueueListeners::addListener(Consumer::shared_ptr c)
 {
-    if (c->preAcquires()) {
-        add(consumers, c);
-    } else {
-        add(browsers, c);
+    if (!c->inListeners) {
+        if (c->acquires) {
+            add(consumers, c);
+        } else {
+            add(browsers, c);
+        }
+        c->inListeners = true;
     }
 }
 
 void QueueListeners::removeListener(Consumer::shared_ptr c)
 {
-    if (c->preAcquires()) {
-        remove(consumers, c);
-    } else {
-        remove(browsers, c);
+    if (c->inListeners) {
+        if (c->acquires) {
+            remove(consumers, c);
+        } else {
+            remove(browsers, c);
+        }
+        c->inListeners = false;
     }
 }
 
@@ -46,18 +52,20 @@ void QueueListeners::populate(Notificati
 {
     if (consumers.size()) {
         set.consumer = consumers.front();
-        consumers.erase(consumers.begin());
+        consumers.pop_front();
+        set.consumer->inListeners = false;
     } else {
-        // Don't swap the vectors, hang on to the memory allocated.
+        // Don't swap the deques, hang on to the memory allocated.
         set.browsers = browsers;
         browsers.clear();
+        for (Listeners::iterator i = set.browsers.begin(); i != set.browsers.end(); i++)
+            (*i)->inListeners = false;
     }
 }
 
 void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
 {
-    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
-    if (i == listeners.end()) listeners.push_back(c);
+    listeners.push_back(c);
 }
 
 void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
@@ -73,9 +81,7 @@ void QueueListeners::NotificationSet::no
 }
 
 bool QueueListeners::contains(Consumer::shared_ptr c) const {
-    return
-        std::find(browsers.begin(), browsers.end(), c) != browsers.end() ||
-        std::find(consumers.begin(), consumers.end(), c) != consumers.end();
+    return c->inListeners;
 }
 
 void QueueListeners::ListenerSet::notifyAll()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h Thu Oct 20 18:42:46 2011
@@ -22,7 +22,7 @@
  *
  */
 #include "qpid/broker/Consumer.h"
-#include <vector>
+#include <deque>
 
 namespace qpid {
 namespace broker {
@@ -40,7 +40,7 @@ namespace broker {
 class QueueListeners
 {
   public:
-    typedef std::vector<Consumer::shared_ptr> Listeners;
+    typedef std::deque<Consumer::shared_ptr> Listeners;
 
     class NotificationSet
     {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h Thu Oct 20 18:42:46 2011
@@ -24,18 +24,52 @@
 namespace qpid {
 namespace broker {
 
-class QueuedMessage;
+struct QueuedMessage;
+class Consumer;
+
 /**
- * Interface for notifying classes who want to act as 'observers' of a
- * queue of particular events.
+ * Interface for notifying classes who want to act as 'observers' of a queue of particular
+ * events.
+ *
+ * The events that are monitored reflect the relationship between a particular message and
+ * the queue it has been delivered to.  A message can be considered in one of three states
+ * with respect to the queue:
+ *
+ * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
+ *
+ * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
+ * (by either browse or acquire), but still considered on the queue.
+ *
+ * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
+ *
+ * The queue events that are observable are:
+ *
+ * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
+ * (e.g. browse or acquire)
+ *
+ * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
+ * for other consumers to browse or acquire, but it is not yet considered dequeued as it
+ * may be requeued by the consumer.
+ *
+ * "Requeued" - a previously-acquired message is released by its owner: it is put back on
+ * the queue at its original position and returns to the "Available" state.
+ *
+ * "Dequeued" - a message is no longer queued.  At this point, the queue no longer tracks
+ * the message, and the broker considers the consumer's transaction complete.
  */
 class QueueObserver
 {
   public:
     virtual ~QueueObserver() {}
+
+    // note: the Queue will hold the messageLock while calling these methods!
     virtual void enqueued(const QueuedMessage&) = 0;
     virtual void dequeued(const QueuedMessage&) = 0;
-  private:
+    virtual void acquired(const QueuedMessage&) = 0;
+    virtual void requeued(const QueuedMessage&) = 0;
+    virtual void consumerAdded( const Consumer& ) {};
+    virtual void consumerRemoved( const Consumer& ) {};
+ private:
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp Thu Oct 20 18:42:46 2011
@@ -117,30 +117,30 @@ void QueuePolicy::update(FieldTable& set
     settings.setString(typeKey, type);
 }
 
-uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
+template <typename T>
+T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
 {
     FieldTable::ValuePtr v = settings.get(key);
 
-    int32_t result = 0;
+    T result = 0;
 
     if (!v) return defaultValue;
     if (v->getType() == 0x23) {
         QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
     } else if (v->getType() == 0x33) {
         QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
-    } else if (v->convertsTo<int>()) {
-        result = v->get<int>();
+    } else if (v->convertsTo<T>()) {
+        result = v->get<T>();
         QPID_LOG(debug, "Got integer value for " << key << ": " << result);
         if (result >= 0) return result;
     } else if (v->convertsTo<string>()) {
         string s(v->get<string>());
         QPID_LOG(debug, "Got string value for " << key << ": " << s);
         std::istringstream convert(s);
-        if (convert >> result && result >= 0) return result;
+        if (convert >> result && result >= 0 && convert.eof()) return result;
     }
 
-    QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
-    return defaultValue;
+    throw IllegalArgumentException(QPID_MSG("Cannot convert " << key << " to unsigned integer: " << *v));
 }
 
 std::string QueuePolicy::getType(const FieldTable& settings)
@@ -247,7 +247,7 @@ bool RingQueuePolicy::checkLimit(boost::
 {
 
     // If the message is bigger than the queue size, give up
-    if (m->contentSize() > getMaxSize()) {
+    if (getMaxSize() && m->contentSize() > getMaxSize()) {
         QPID_LOG(debug, "Message too large for ring queue " << name 
                  << " [" << *this  << "] "
                  << ": message size = " << m->contentSize() << " bytes"
@@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::
 
     do {
         QueuedMessage oldest  = queue.front();
-
-        if (oldest.queue->acquire(oldest) || !strict) {
+        if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
             queue.pop_front();
             pendingDequeues.push_back(oldest);
             QPID_LOG(debug, "Ring policy triggered in " << name 
@@ -320,8 +319,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
 
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
 {
-    uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
-    uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+    uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
+    uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
     if (maxCount || maxSize) {
         return createQueuePolicy(name, maxCount, maxSize, getType(settings));
     } else {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h Thu Oct 20 18:42:46 2011
@@ -43,8 +43,7 @@ class QueuePolicy
     uint32_t count;
     uint64_t size;
     bool policyExceeded;
-            
-    static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
+
 
   protected:
     uint64_t getCurrentQueueSize() const { return size; } 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp Thu Oct 20 18:42:46 2011
@@ -21,6 +21,7 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>
@@ -36,7 +37,13 @@ QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
 QueueRegistry::declare(const string& declareName, bool durable, 
-                       bool autoDelete, const OwnershipToken* owner)
+                       bool autoDelete, const OwnershipToken* owner,
+                       boost::shared_ptr<Exchange> alternate,
+                       const qpid::framing::FieldTable& arguments,
+                       bool recovering/*true if this declare is a
+                                        result of recovering queue
+                                        definition from persistente
+                                        record*/)
 {
     RWlock::ScopedWlock locker(lock);
     string name = declareName.empty() ? generateName() : declareName;
@@ -45,6 +52,17 @@ QueueRegistry::declare(const string& dec
 
     if (i == queues.end()) {
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+        if (alternate) {
+            queue->setAlternateExchange(alternate);//need to do this *before* create
+            alternate->incAlternateUsers();
+        }
+        if (!recovering) {
+            //apply settings & create persistent record if required
+            queue->create(arguments);
+        } else {
+            //i.e. recovering a queue for which we already have a persistent record
+            queue->configure(arguments);
+        }
         queues[name] = queue;
         if (lastNode) queue->setLastNodeFailure();
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h Thu Oct 20 18:42:46 2011
@@ -24,6 +24,7 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
+#include "qpid/framing/FieldTable.h"
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
 #include <algorithm>
@@ -34,6 +35,7 @@ namespace broker {
 
 class Queue;
 class QueueEvents;
+class Exchange;
 class OwnershipToken;
 class Broker;
 class MessageStore;
@@ -60,7 +62,10 @@ class QueueRegistry {
         const std::string& name,
         bool durable = false,
         bool autodelete = false, 
-        const OwnershipToken* owner = 0);
+        const OwnershipToken* owner = 0,
+        boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
+        const qpid::framing::FieldTable& args = framing::FieldTable(),
+        bool recovering = false);
 
     /**
      * Destroy the named queue.

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp Thu Oct 20 18:42:46 2011
@@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw()
 
 void RecoveredDequeue::rollback() throw()
 {
-    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp Thu Oct 20 18:42:46 2011
@@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(Transacti
 }
 
 void RecoveredEnqueue::commit() throw(){
-    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu Oct 20 18:42:46 2011
@@ -113,7 +113,7 @@ RecoverableExchange::shared_ptr Recovery
 
 RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
 {
-    Queue::shared_ptr queue = Queue::decode(queues, buffer, true);
+    Queue::shared_ptr queue = Queue::restore(queues, buffer);
     try {
         Exchange::shared_ptr exchange = exchanges.getDefault();
         if (exchange) {
@@ -252,7 +252,6 @@ void RecoverableMessageImpl::dequeue(Dtx
 
 void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
 {
-    msg->enqueueComplete(); // recoved nmessage to enqueued in store already
     buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp Thu Oct 20 18:42:46 2011
@@ -30,6 +30,7 @@
 #include <boost/format.hpp>
 
 #if HAVE_SASL
+#include <sys/stat.h>
 #include <sasl/sasl.h>
 #include "qpid/sys/cyrus/CyrusSecurityLayer.h"
 using qpid::sys::cyrus::CyrusSecurityLayer;
@@ -57,7 +58,7 @@ public:
     NullAuthenticator(Connection& connection, bool encrypt);
     ~NullAuthenticator();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string& response);
+    void start(const std::string& mechanism, const std::string* response);
     void step(const std::string&) {}
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
 };
@@ -81,7 +82,7 @@ public:
     ~CyrusAuthenticator();
     void init();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string& response);
+    void start(const std::string& mechanism, const std::string* response);
     void step(const std::string& response);
     void getError(std::string& error);
     void getUid(std::string& uid) { getUsername(uid); }
@@ -98,11 +99,33 @@ void SaslAuthenticator::init(const std::
     //  Check if we have a version of SASL that supports sasl_set_path()
 #if (SASL_VERSION_FULL >= ((2<<16)|(1<<8)|22))
     //  If we are not given a sasl path, do nothing and allow the default to be used.
-    if ( ! saslConfigPath.empty() ) {
-        int code = sasl_set_path(SASL_PATH_TYPE_CONFIG,
-                                 const_cast<char *>(saslConfigPath.c_str()));
+    if ( saslConfigPath.empty() ) {
+        QPID_LOG ( info, "SASL: no config path set - using default." );
+    }
+    else {
+        struct stat st;
+
+        // Make sure the directory exists and we can read up to it.
+        if ( ::stat ( saslConfigPath.c_str(), & st) ) {
+          // Note: not using strerror() here because I think its messages are a little too hazy.
+          if ( errno == ENOENT )
+              throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: no such directory: " << saslConfigPath ) );
+          if ( errno == EACCES )
+              throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot read parent of: " << saslConfigPath ) );
+          // catch-all stat failure
+          throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot stat: " << saslConfigPath ) );
+        }
+
+        // Make sure the directory is readable.
+        if ( ::access ( saslConfigPath.c_str(), R_OK ) ) {
+            throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: directory not readable:" << saslConfigPath ) );
+        }
+
+        // This shouldn't fail now, but check anyway.
+        int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, const_cast<char *>(saslConfigPath.c_str()));
         if(SASL_OK != code)
             throw Exception(QPID_MSG("SASL: sasl_set_path failed [" << code << "] " ));
+
         QPID_LOG(info, "SASL: config path set to " << saslConfigPath );
     }
 #endif
@@ -164,7 +187,7 @@ void NullAuthenticator::getMechanisms(Ar
     mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
 }
 
-void NullAuthenticator::start(const string& mechanism, const string& response)
+void NullAuthenticator::start(const string& mechanism, const string* response)
 {
     if (encrypt) {
 #if HAVE_SASL
@@ -180,16 +203,16 @@ void NullAuthenticator::start(const stri
         }
     }
     if (mechanism == "PLAIN") { // Old behavior
-        if (response.size() > 0) {
+        if (response && response->size() > 0) {
             string uid;
-            string::size_type i = response.find((char)0);
-            if (i == 0 && response.size() > 1) {
+            string::size_type i = response->find((char)0);
+            if (i == 0 && response->size() > 1) {
                 //no authorization id; use authentication id
-                i = response.find((char)0, 1);
-                if (i != string::npos) uid = response.substr(1, i-1);
+                i = response->find((char)0, 1);
+                if (i != string::npos) uid = response->substr(1, i-1);
             } else if (i != string::npos) {
                 //authorization id is first null delimited field
-                uid = response.substr(0, i);
+                uid = response->substr(0, i);
             }//else not a valid SASL PLAIN response, throw error?            
             if (!uid.empty()) {
                 //append realm if it has not already been added
@@ -376,18 +399,22 @@ void CyrusAuthenticator::getMechanisms(A
     }
 }
 
-void CyrusAuthenticator::start(const string& mechanism, const string& response)
+void CyrusAuthenticator::start(const string& mechanism, const string* response)
 {
     const char *challenge;
     unsigned int challenge_len;
     
-    QPID_LOG(debug, "SASL: Starting authentication with mechanism: " << mechanism);
+    // This should be at same debug level as mech list in getMechanisms().
+    QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
     int code = sasl_server_start(sasl_conn,
                                  mechanism.c_str(),
-                                 response.c_str(), response.length(),
+                                 (response ? response->c_str() : 0), (response ? response->size() : 0),
                                  &challenge, &challenge_len);
     
     processAuthenticationStep(code, challenge, challenge_len);
+    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+    if ( cnxMgmt ) 
+        cnxMgmt->set_saslMechanism(mechanism);
 }
         
 void CyrusAuthenticator::step(const string& response)
@@ -424,10 +451,12 @@ void CyrusAuthenticator::processAuthenti
         client.secure(challenge_str);
     } else {
         std::string uid;
+        //save error detail before trying to retrieve username as error in doing so will overwrite it
+        std::string errordetail = sasl_errdetail(sasl_conn);
         if (!getUsername(uid)) {
-            QPID_LOG(info, "SASL: Authentication failed (no username available):" << sasl_errdetail(sasl_conn));
+            QPID_LOG(info, "SASL: Authentication failed (no username available yet):" << errordetail);
         } else {
-            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << sasl_errdetail(sasl_conn));
+            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << errordetail);
         }
 
         // TODO: Change to more specific exceptions, when they are
@@ -459,6 +488,9 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
     if (ssf) {
         securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
     }
+    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+    if ( cnxMgmt ) 
+        cnxMgmt->set_saslSsf(ssf);
     return securityLayer;
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h Thu Oct 20 18:42:46 2011
@@ -41,7 +41,7 @@ class SaslAuthenticator
 public:
     virtual ~SaslAuthenticator() {}
     virtual void getMechanisms(framing::Array& mechanisms) = 0;
-    virtual void start(const std::string& mechanism, const std::string& response) = 0;
+    virtual void start(const std::string& mechanism, const std::string* response) = 0;
     virtual void step(const std::string& response) = 0;
     virtual void getUid(std::string&) {}
     virtual bool getUsername(std::string&) { return false; };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org