You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [9/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,10 +31,7 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/MessageGroupManager.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -44,7 +41,6 @@
 #include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
-#include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
 
@@ -68,7 +64,7 @@ using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 
-namespace
+namespace 
 {
 const std::string qpidMaxSize("qpid.max_size");
 const std::string qpidMaxCount("qpid.max_count");
@@ -90,16 +86,16 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
              const OwnershipToken* const _owner,
              Manageable* parent,
              Broker* b) :
 
-    name(_name),
+    name(_name), 
     autodelete(_autodelete),
     store(_store),
-    owner(_owner),
+    owner(_owner), 
     consumerCount(0),
     exclusive(0),
     noLocal(false),
@@ -114,8 +110,7 @@ Queue::Queue(const string& _name, bool _
     broker(b),
     deleted(false),
     barrier(*this),
-    autoDeleteTimeout(0),
-    allocator(new FifoDistributor( *messages ))
+    autoDeleteTimeout(0)
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -168,8 +163,13 @@ void Queue::deliver(boost::intrusive_ptr
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
-        enqueue(0, msg);
-        push(msg);
+        // if no store then mark as enqueued
+        if (!enqueue(0, msg)){
+            push(msg);
+            msg->enqueueComplete();
+        }else {
+            push(msg);
+        }
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -183,10 +183,11 @@ void Queue::recover(boost::intrusive_ptr
     if (policy.get()) policy->recoverEnqueued(msg);
 
     push(msg, true);
-    if (store){
+    if (store){ 
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
-        msg->addToSyncList(shared_from_this(), store);
+        msg->addToSyncList(shared_from_this(), store); 
     }
+    msg->enqueueComplete(); // mark the message as enqueued
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -210,13 +211,14 @@ void Queue::process(boost::intrusive_ptr
 void Queue::requeue(const QueuedMessage& msg){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    {
+    {    
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
+        msg.payload->enqueueComplete(); // mark the message as enqueued
         messages->reinsert(msg);
         listeners.populate(copy);
 
-        // for persistLastNode - don't force a message twice to disk, but force it if no force before
+        // for persistLastNode - don't force a message twice to disk, but force it if no force before 
         if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
             msg.payload->forcePersistent();
             if (msg.payload->isForcedPersistent() ){
@@ -224,17 +226,16 @@ void Queue::requeue(const QueuedMessage&
             	enqueue(0, payload);
             }
         }
-        observeRequeue(msg, locker);
     }
     copy.notify();
 }
 
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
 {
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (acquire(position, message, locker)) {
+    if (messages->remove(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -243,24 +244,9 @@ bool Queue::acquireMessageAt(const Seque
     }
 }
 
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
-{
-    Mutex::ScopedLock locker(messageLock);
-    assertClusterSafe();
-    QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
-    if (!allocator->allocate( consumer, msg )) {
-        QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
-        return false;
-    }
-
-    QueuedMessage copy(msg);
-    if (acquire( msg.position, copy, locker)) {
-        QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
-        return true;
-    }
-    QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
-    return false;
+bool Queue::acquire(const QueuedMessage& msg) {
+    QueuedMessage copy = msg;
+    return acquireMessageAt(msg.position, copy);
 }
 
 void Queue::notifyListener()
@@ -276,7 +262,7 @@ void Queue::notifyListener()
     set.notify();
 }
 
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     checkNotDeleted();
     if (c->preAcquires()) {
@@ -288,71 +274,52 @@ bool Queue::getNextMessage(QueuedMessage
           case NO_MESSAGES:
           default:
             return false;
-        }
+        }        
     } else {
         return browseNextMessage(m, c);
     }
 }
 
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        QueuedMessage msg;
-
-        if (!allocator->nextConsumableMessage(c, msg)) { // no next available
-            QPID_LOG(debug, "No messages available to dispatch to consumer " <<
-                     c->getName() << " on queue '" << name << "'");
+        if (messages->empty()) { 
+            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
-        }
-
-        if (msg.payload->hasExpired()) {
-            QPID_LOG(debug, "Message expired from queue '" << name << "'");
-            c->position = msg.position;
-            acquire( msg.position, msg, locker);
-            dequeue( 0, msg );
-            continue;
-        }
-
-        // a message is available for this consumer - can the consumer use it?
+        } else {
+            QueuedMessage msg = messages->front();
+            if (msg.payload->hasExpired()) {
+                QPID_LOG(debug, "Message expired from queue '" << name << "'");
+                popAndDequeue();
+                continue;
+            }
 
-        if (c->filter(msg.payload)) {
-            if (c->accept(msg.payload)) {
-                bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
-                (void) ok; assert(ok);
-                ok = acquire( msg.position, msg, locker);
-                (void) ok; assert(ok);
-                m = msg;
-                c->position = m.position;
-                return CONSUMED;
+            if (c->filter(msg.payload)) {
+                if (c->accept(msg.payload)) {            
+                    m = msg;
+                    pop();
+                    return CONSUMED;
+                } else {
+                    //message(s) are available but consumer hasn't got enough credit
+                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                    return CANT_CONSUME;
+                }
             } else {
-                //message(s) are available but consumer hasn't got enough credit
-                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                //consumer will never want this message
+                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
                 return CANT_CONSUME;
-            }
-        } else {
-            //consumer will never want this message
-            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-            c->position = msg.position;
-            return CANT_CONSUME;
+            } 
         }
     }
 }
 
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
-    while (true) {
-        Mutex::ScopedLock locker(messageLock);
-        QueuedMessage msg;
-
-        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
-            QPID_LOG(debug, "No browsable messages available for consumer " <<
-                     c->getName() << " on queue '" << name << "'");
-            listeners.addListener(c);
-            return false;
-        }
 
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+{
+    QueuedMessage msg(this);
+    while (seek(msg, c)) {
         if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
             if (c->accept(msg.payload)) {
                 //consumer wants the message
@@ -366,8 +333,8 @@ bool Queue::browseNextMessage(QueuedMess
             }
         } else {
             //consumer will never want this message, continue seeking
-            QPID_LOG(debug, "Browser skipping message from '" << name << "'");
             c->position = msg.position;
+            QPID_LOG(debug, "Browser skipping message from '" << name << "'");
         }
     }
     return false;
@@ -397,71 +364,61 @@ bool Queue::dispatch(Consumer::shared_pt
     }
 }
 
-bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
-
+// Find the next message 
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
-    if (messages->find(pos, msg))
+    if (messages->next(c->position, msg)) {
         return true;
-    return false;
+    } else {
+        listeners.addListener(c);
+        return false;
+    }
+}
+
+QueuedMessage Queue::find(SequenceNumber pos) const {
+
+    Mutex::ScopedLock locker(messageLock);
+    QueuedMessage msg;
+    messages->find(pos, msg);
+    return msg;
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     assertClusterSafe();
-    {
-        Mutex::ScopedLock locker(consumerLock);
-        if(exclusive) {
+    Mutex::ScopedLock locker(consumerLock);
+    if(exclusive) {
+        throw ResourceLockedException(
+            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+    } else if(requestExclusive) {
+        if(consumerCount) {
             throw ResourceLockedException(
-                                          QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-        } else if(requestExclusive) {
-            if(consumerCount) {
-                throw ResourceLockedException(
-                                              QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-            } else {
-                exclusive = c->getSession();
-            }
-        }
-        consumerCount++;
-        if (mgmtObject != 0)
-            mgmtObject->inc_consumerCount ();
-        //reset auto deletion timer if necessary
-        if (autoDeleteTimeout && autoDeleteTask) {
-            autoDeleteTask->cancel();
+                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+        } else {
+            exclusive = c->getSession();
         }
     }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerAdded(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
-        }
+    consumerCount++;
+    if (mgmtObject != 0)
+        mgmtObject->inc_consumerCount ();
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
     }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
-    {
-        Mutex::ScopedLock locker(consumerLock);
-        consumerCount--;
-        if(exclusive) exclusive = 0;
-        if (mgmtObject != 0)
-            mgmtObject->dec_consumerCount ();
-    }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerRemoved(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
-        }
-    }
+    Mutex::ScopedLock locker(consumerLock);
+    consumerCount--;
+    if(exclusive) exclusive = 0;
+    if (mgmtObject != 0)
+        mgmtObject->dec_consumerCount ();
 }
 
 QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    if (messages->pop(msg))
-        observeAcquire(msg, locker);
+    messages->pop(msg);
     return msg;
 }
 
@@ -475,135 +432,22 @@ bool collect_if_expired(std::deque<Queue
     }
 }
 
-/**
- *@param lapse: time since the last purgeExpired
- */
-void Queue::purgeExpired(qpid::sys::Duration lapse)
+void Queue::purgeExpired()
 {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
-    //attempt is less than one per second.
-    int count = dequeueSincePurge.get();
-    dequeueSincePurge -= count;
-    int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
-    if (seconds == 0 || count / seconds < 1) {
+    //attempt is less than one per second.  
+
+    if (dequeueTracker.sampleRatePerSecond() < 1) {
         std::deque<QueuedMessage> expired;
         {
             Mutex::ScopedLock locker(messageLock);
-            messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
-        }
-
-        for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
-             i != expired.end(); ++i) {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*i, locker);
-            }
-            dequeue( 0, *i );
+            messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
+        for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 }
 
-
-namespace {
-    // for use with purge/move below - collect messages that match a given filter
-    //
-    class MessageFilter
-    {
-    public:
-        static const std::string typeKey;
-        static const std::string paramsKey;
-        static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
-        virtual bool match( const QueuedMessage& ) const { return true; }
-        virtual ~MessageFilter() {}
-    protected:
-        MessageFilter() {};
-    };
-    const std::string MessageFilter::typeKey("filter_type");
-    const std::string MessageFilter::paramsKey("filter_params");
-
-    // filter by message header string value exact match
-    class HeaderMatchFilter : public MessageFilter
-    {
-    public:
-        /* Config:
-           { 'filter_type' : 'header_match_str',
-             'filter_params' : { 'header_key' : "<header name>",
-                                 'header_value' : "<value to match>"
-                               }
-           }
-        */
-        static const std::string typeKey;
-        static const std::string headerKey;
-        static const std::string valueKey;
-        HeaderMatchFilter( const std::string& _header, const std::string& _value )
-            : MessageFilter (), header(_header), value(_value) {}
-        bool match( const QueuedMessage& msg ) const
-        {
-            const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
-            if (!headers) return false;
-            FieldTable::ValuePtr h = headers->get(header);
-            if (!h || !h->convertsTo<std::string>()) return false;
-            return h->get<std::string>() == value;
-        }
-    private:
-        const std::string header;
-        const std::string value;
-    };
-    const std::string HeaderMatchFilter::typeKey("header_match_str");
-    const std::string HeaderMatchFilter::headerKey("header_key");
-    const std::string HeaderMatchFilter::valueKey("header_value");
-
-    // factory to create correct filter based on map
-    MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
-    {
-        using namespace qpid::types;
-        if (filter && !filter->empty()) {
-            Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
-            if (i != filter->end()) {
-
-                if (i->second.asString() == HeaderMatchFilter::typeKey) {
-                    Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
-                    if (p != filter->end() && p->second.getType() == VAR_MAP) {
-                        Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
-                        Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
-                        if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
-                            std::string headerKey(k->second.asString());
-                            std::string value(v->second.asString());
-                            QPID_LOG(debug, "Message filtering by header value configured.  key: " << headerKey << " value: " << value );
-                            return new HeaderMatchFilter( headerKey, value );
-                        }
-                    }
-                }
-            }
-            QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
-        }
-        return new MessageFilter();
-    }
-
-    // used by removeIf() to collect all messages matching a filter, maximum match count is
-    // optional.
-    struct Collector {
-        const uint32_t maxMatches;
-        MessageFilter& filter;
-        std::deque<QueuedMessage> matches;
-        Collector(MessageFilter& filter, uint32_t max)
-            : maxMatches(max), filter(filter) {}
-        bool operator() (QueuedMessage& qm)
-        {
-            if (maxMatches == 0 || matches.size() < maxMatches) {
-                if (filter.match( qm )) {
-                    matches.push_back(qm);
-                    return true;
-                }
-            }
-            return false;
-        }
-    };
-
-} // end namespace
-
-
 /**
  * purge - for purging all or some messages on a queue
  *         depending on the purge_request
@@ -615,77 +459,63 @@ namespace {
  * The dest exchange may be supplied to re-route messages through the exchange.
  * It is safe to re-route messages such that they arrive back on the same queue,
  * even if the queue is ordered by priority.
- *
- * An optional filter can be supplied that will be applied against each message.  The
- * message is purged only if the filter matches.  See MessageDistributor for more detail.
  */
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
-                      const qpid::types::Variant::Map *filter)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
 {
-    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
-    Collector c(*mf.get(), purge_request);
-
     Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
-        // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        // now reroute if necessary
+    uint32_t purge_count = purge_request; // only comes into play if  >0 
+    std::deque<DeliverableMessage> rerouteQueue;
+
+    uint32_t count = 0;
+    // Either purge them all or just the some (purge_count) while the queue isn't empty.
+    while((!purge_request || purge_count--) && !messages->empty()) {
         if (dest.get()) {
-            assert(qmsg->payload);
-            DeliverableMessage dmsg(qmsg->payload);
-            dest->routeWithAlternate(dmsg);
+            //
+            // If there is a destination exchange, stage the messages onto a reroute queue
+            // so they don't wind up getting purged more than once.
+            //
+            DeliverableMessage msg(messages->front().payload);
+            rerouteQueue.push_back(msg);
         }
+        popAndDequeue();
+        count++;
     }
-    return c.matches.size();
-}
 
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
-                     const qpid::types::Variant::Map *filter)
-{
-    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
-    Collector c(*mf.get(), qty);
+    //
+    // Re-route purged messages into the destination exchange.  Note that there's no need
+    // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
+    //
+    while (!rerouteQueue.empty()) {
+        DeliverableMessage msg(rerouteQueue.front());
+        rerouteQueue.pop_front();
+        dest->route(msg, msg.getMessage().getRoutingKey(),
+                    msg.getMessage().getApplicationHeaders());
+    }
+
+    return count;
+}
 
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
     Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    uint32_t move_count = qty; // only comes into play if  qty >0 
+    uint32_t count = 0; // count how many were moved for returning
 
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
-        // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        // and move to destination Queue.
-        assert(qmsg->payload);
-        destq->deliver(qmsg->payload);
+    while((!qty || move_count--) && !messages->empty()) {
+        QueuedMessage qmsg = messages->front();
+        boost::intrusive_ptr<Message> msg = qmsg.payload;
+        destq->deliver(msg); // deliver message to the destination queue
+        pop();
+        dequeue(0, qmsg);
+        count++;
     }
-    return c.matches.size();
+    return count;
 }
 
-/** Acquire the front (oldest) message from the in-memory queue.
- * assumes messageLock held by caller
- */
-void Queue::pop(const Mutex::ScopedLock& locker)
+void Queue::pop()
 {
     assertClusterSafe();
-    QueuedMessage msg;
-    if (messages->pop(msg)) {
-        observeAcquire(msg, locker);
-        ++dequeueSincePurge;
-    }
-}
-
-/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                    const Mutex::ScopedLock& locker)
-{
-    if (messages->remove(position, msg)) {
-        observeAcquire(msg, locker);
-        ++dequeueSincePurge;
-        return true;
-    }
-    return false;
+    messages->pop();
+    ++dequeueTracker;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -694,15 +524,13 @@ void Queue::push(boost::intrusive_ptr<Me
     QueuedMessage removed;
     bool dequeueRequired = false;
     {
-        Mutex::ScopedLock locker(messageLock);
+        Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
+        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
+         
         dequeueRequired = messages->push(qm, removed);
-        if (dequeueRequired)
-            observeAcquire(removed, locker);
         listeners.populate(copy);
-        observeEnqueue(qm, locker);
+        enqueued(qm);
     }
     copy.notify();
     if (dequeueRequired) {
@@ -718,7 +546,7 @@ void Queue::push(boost::intrusive_ptr<Me
 
 void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
 {
-    if (message.payload->isIngressComplete()) (*result)++;
+    if (message.payload->isEnqueueComplete()) (*result)++;
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -778,7 +606,7 @@ void Queue::setLastNodeFailure()
 }
 
 
-// return true if store exists,
+// return true if store exists, 
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
 {
     ScopedUse u(barrier);
@@ -792,21 +620,24 @@ bool Queue::enqueue(TransactionContext* 
             policy->getPendingDequeues(dequeues);
         }
         //depending on policy, may have some dequeues that need to performed without holding the lock
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
     }
 
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
-
+       
     if (traceId.size()) {
+        //copy on write: take deep copy of message before modifying it
+        //as the frames may already be available for delivery on other
+        //threads
+        boost::intrusive_ptr<Message> copy(new Message(*msg));
+        msg = copy;
         msg->addTraceId(traceId);
     }
 
     if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
-        // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
-        // when it considers the message stored.
-        msg->enqueueAsync(shared_from_this(), store);
+        msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
@@ -823,10 +654,10 @@ bool Queue::enqueue(TransactionContext* 
 void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);
+    if (policy.get()) policy->enqueueAborted(msg);       
 }
 
-// return true if store exists,
+// return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
     ScopedUse u(barrier);
@@ -835,8 +666,8 @@ bool Queue::dequeue(TransactionContext* 
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
-        if (!ctxt) {
-            observeDequeue(msg, locker);
+        if (!ctxt) { 
+            dequeued(msg);
         }
     }
     // This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -856,7 +687,7 @@ bool Queue::dequeue(TransactionContext* 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    observeDequeue(msg, locker);
+    dequeued(msg);    
     if (mgmtObject != 0) {
         mgmtObject->inc_msgTxnDequeues();
         mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -864,23 +695,21 @@ void Queue::dequeueCommitted(const Queue
 }
 
 /**
- * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
- * it from the logical (and persistent if applicable) queue
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
  */
-void Queue::popAndDequeue(const Mutex::ScopedLock& held)
+void Queue::popAndDequeue()
 {
-    if (!messages->empty()) {
-        QueuedMessage msg = messages->front();
-        pop(held);
-        dequeue(0, msg);
-    }
+    QueuedMessage msg = messages->front();
+    pop();
+    dequeue(0, msg);
 }
 
 /**
  * Updates policy and management when a message has been dequeued,
  * expects messageLock to be held
  */
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::dequeued(const QueuedMessage& msg)
 {
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
@@ -893,33 +722,6 @@ void Queue::observeDequeue(const QueuedM
     }
 }
 
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
- */
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
-{
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->acquired(msg);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
-        }
-    }
-}
-
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
- */
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
-{
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->requeued(msg);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
-        }
-    }
-}
 
 void Queue::create(const FieldTable& _settings)
 {
@@ -927,7 +729,7 @@ void Queue::create(const FieldTable& _se
     if (store) {
         store->create(*this, _settings);
     }
-    configureImpl(_settings);
+    configure(_settings);
 }
 
 
@@ -940,8 +742,8 @@ int getIntegerSetting(const qpid::framin
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try {
-            return boost::lexical_cast<int>(s);
+        try { 
+            return boost::lexical_cast<int>(s); 
         } catch(const boost::bad_lexical_cast&) {
             QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
             return 0;
@@ -952,45 +754,15 @@ int getIntegerSetting(const qpid::framin
     }
 }
 
-bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+void Queue::configure(const FieldTable& _settings, bool recovering)
 {
-    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
-    if (!v) {
-        return false;
-    } else if (v->convertsTo<int>()) {
-        return v->get<int>() != 0;
-    } else if (v->convertsTo<std::string>()){
-        std::string s = v->get<std::string>();
-        if (s == "True")  return true;
-        if (s == "true")  return true;
-        if (s == "False") return false;
-        if (s == "false") return false;
-        try {
-            return boost::lexical_cast<bool>(s);
-        } catch(const boost::bad_lexical_cast&) {
-            QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
-            return false;
-        }
-    } else {
-        QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
-        return false;
-    }
-}
-
-void Queue::configure(const FieldTable& _settings)
-{
-    settings = _settings;
-    configureImpl(settings);
-}
 
-void Queue::configureImpl(const FieldTable& _settings)
-{
     eventMode = _settings.getAsInt(qpidQueueEventGeneration);
     if (eventMode && broker) {
         broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
     }
 
-    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
         (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -1004,43 +776,32 @@ void Queue::configureImpl(const FieldTab
         setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
     }
     if (broker && broker->getManagementAgent()) {
-        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
+        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
     }
 
     //set this regardless of owner to allow use of no-local with exclusive consumers also
-    noLocal = getBoolSetting(_settings, qpidNoLocal);
+    noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
     std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
     if (lvqKey.size()) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
         messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-    } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
+    } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-    } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
+    } else if (_settings.get(qpidLastValueQueue)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
     } else {
         std::auto_ptr<Messages> m = Fairshare::create(_settings);
         if (m.get()) {
             messages = m;
-            allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
-        } else { // default (FIFO) queue type
-            // override default message allocator if message groups configured.
-            boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
-            if (mgm) {
-                allocator = mgm;
-                addObserver(mgm);
-            }
         }
     }
-
-    persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
+    
+    persistLastNode= _settings.get(qpidPersistLastNode);
     if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
     traceId = _settings.getAsString(qpidTraceIdentity);
@@ -1048,32 +809,32 @@ void Queue::configureImpl(const FieldTab
     if (excludeList.size()) {
         split(traceExclude, excludeList, ", ");
     }
-    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
     autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
-    if (autoDeleteTimeout)
-        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+    if (autoDeleteTimeout) 
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
 
-    if (mgmtObject != 0) {
+    if (mgmtObject != 0)
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
-    }
 
-    QueueFlowLimit::observe(*this, _settings);
+    if ( isDurable() && ! getPersistenceId() && ! recovering )
+      store->create(*this, _settings);
 }
 
-void Queue::destroyed()
+void Queue::destroy()
 {
-    unbind(broker->getExchanges());
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages->empty()){
             DeliverableMessage msg(messages->front().payload);
-            alternateExchange->routeWithAlternate(msg);
-            popAndDequeue(locker);
+            alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+                                     msg.getMessage().getApplicationHeaders());
+            popAndDequeue();
         }
         alternateExchange->decAlternateUsers();
     }
@@ -1085,7 +846,6 @@ void Queue::destroyed()
         store = 0;//ensure we make no more calls to the store for this queue
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
-    notifyDeleted();
 }
 
 void Queue::notifyDeleted()
@@ -1105,9 +865,9 @@ void Queue::bound(const string& exchange
     bindings.add(exchange, key, args);
 }
 
-void Queue::unbind(ExchangeRegistry& exchanges)
+void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
 {
-    bindings.unbind(exchanges, shared_from_this());
+    bindings.unbind(exchanges, shared_ref);
 }
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -1120,9 +880,9 @@ const QueuePolicy* Queue::getPolicy()
     return policy.get();
 }
 
-uint64_t Queue::getPersistenceId() const
-{
-    return persistenceId;
+uint64_t Queue::getPersistenceId() const 
+{ 
+    return persistenceId; 
 }
 
 void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -1136,11 +896,11 @@ void Queue::setPersistenceId(uint64_t _p
     persistenceId = _persistenceId;
 }
 
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const 
 {
     buffer.putShortString(name);
     buffer.put(settings);
-    if (policy.get()) {
+    if (policy.get()) { 
         buffer.put(*policy);
     }
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -1154,14 +914,13 @@ uint32_t Queue::encodedSize() const
         + (policy.get() ? (*policy).encodedSize() : 0);
 }
 
-Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
+Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool recovering )
 {
     string name;
     buffer.getShortString(name);
-    FieldTable settings;
-    buffer.get(settings);
-    boost::shared_ptr<Exchange> alternate;
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+    buffer.get(result.first->settings);
+    result.first->configure(result.first->settings, recovering );
     if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
         buffer.get ( *(result.first->policy) );
     }
@@ -1193,10 +952,11 @@ boost::shared_ptr<Exchange> Queue::getAl
 
 void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
-    if (broker.getQueues().destroyIf(queue->getName(),
+    if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->destroyed();
+        queue->unbind(broker.getExchanges(), queue);
+        queue->destroy();
     }
 }
 
@@ -1205,7 +965,7 @@ struct AutoDeleteTask : qpid::sys::Timer
     Broker& broker;
     Queue::shared_ptr queue;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
         : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
 
     void fire()
@@ -1223,27 +983,27 @@ void Queue::tryAutoDelete(Broker& broker
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
-        broker.getClusterTimer().add(queue->autoDeleteTask);
+        broker.getClusterTimer().add(queue->autoDeleteTask);        
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
         tryAutoDeleteImpl(broker, queue);
     }
 }
 
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
+{ 
     Mutex::ScopedLock locker(ownershipLock);
-    return o == owner;
+    return o == owner; 
 }
 
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership() 
+{ 
     Mutex::ScopedLock locker(ownershipLock);
-    owner = 0;
+    owner = 0; 
 }
 
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
+{ 
     //reset auto deletion timer if necessary
     if (autoDeleteTimeout && autoDeleteTask) {
         autoDeleteTask->cancel();
@@ -1252,25 +1012,25 @@ bool Queue::setExclusiveOwner(const Owne
     if (owner) {
         return false;
     } else {
-        owner = o;
+        owner = o; 
         return true;
     }
 }
 
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const 
+{ 
     Mutex::ScopedLock locker(ownershipLock);
-    return owner != 0;
+    return owner != 0; 
 }
 
-bool Queue::hasExclusiveConsumer() const
-{
-    return exclusive;
+bool Queue::hasExclusiveConsumer() const 
+{ 
+    return exclusive; 
 }
 
 void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
-    if (externalQueueStore!=inst && externalQueueStore)
-        delete externalQueueStore;
+    if (externalQueueStore!=inst && externalQueueStore) 
+        delete externalQueueStore; 
     externalQueueStore = inst;
 
     if (inst) {
@@ -1295,7 +1055,7 @@ Manageable::status_t Queue::ManagementMe
     case _qmf::Queue::METHOD_PURGE :
         {
             _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
-            purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
+            purge(purgeArgs.i_request);
             status = Manageable::STATUS_OK;
         }
         break;
@@ -1316,7 +1076,7 @@ Manageable::status_t Queue::ManagementMe
                 }
             }
 
-            purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
+            purge(rerouteArgs.i_request, dest);
             status = Manageable::STATUS_OK;
         }
         break;
@@ -1325,14 +1085,6 @@ Manageable::status_t Queue::ManagementMe
     return status;
 }
 
-
-void Queue::query(qpid::types::Variant::Map& results) const
-{
-    Mutex::ScopedLock locker(messageLock);
-    /** @todo add any interesting queue state into results */
-    if (allocator) allocator->query(results);
-}
-
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
     sequence = n;
@@ -1367,10 +1119,7 @@ void Queue::insertSequenceNumbers(const 
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
 
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
- */
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::enqueued(const QueuedMessage& m)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1393,8 +1142,7 @@ void Queue::updateEnqueued(const QueuedM
         if (policy.get()) {
             policy->recoverEnqueued(payload);
         }
-        Mutex::ScopedLock locker(messageLock);
-        observeEnqueue(m, locker);
+        enqueued(m);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1418,7 +1166,6 @@ void Queue::checkNotDeleted()
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
 {
-    Mutex::ScopedLock locker(messageLock);
     observers.insert(observer);
 }
 
@@ -1428,32 +1175,6 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
-
-bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
-                 const qpid::framing::FieldTable& arguments)
-{
-    if (exchange->bind(shared_from_this(), key, &arguments)) {
-        bound(exchange->getName(), key, arguments);
-        if (exchange->isDurable() && isDurable()) {
-            store->bind(*exchange, *this, key, arguments);
-        }
-        return true;
-    } else {
-        return false;
-    }
-}
-
-
-const Broker* Queue::getBroker()
-{
-    return broker;
-}
-
-void Queue::setDequeueSincePurge(uint32_t value) {
-    dequeueSincePurge = value;
-}
-
-
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
 #include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
-#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
@@ -59,7 +59,7 @@ class MessageStore;
 class QueueEvents;
 class QueueRegistry;
 class TransactionContext;
-class MessageDistributor;
+class Exchange;
 
 /**
  * The brokers representation of an amqp queue. Messages are
@@ -74,13 +74,13 @@ class Queue : public boost::enable_share
     {
         Queue& parent;
         uint count;
-
+                
         UsageBarrier(Queue&);
         bool acquire();
         void release();
         void destroy();
     };
-
+            
     struct ScopedUse
     {
         UsageBarrier& barrier;
@@ -88,7 +88,7 @@ class Queue : public boost::enable_share
         ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
-
+            
     typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
@@ -119,7 +119,7 @@ class Queue : public boost::enable_share
     boost::shared_ptr<Exchange> alternateExchange;
     framing::SequenceNumber sequence;
     qmf::org::apache::qpid::broker::Queue* mgmtObject;
-    sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
+    RateTracker dequeueTracker;
     int eventMode;
     Observers observers;
     bool insertSeqNo;
@@ -129,36 +129,26 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
-    boost::shared_ptr<MessageDistributor> allocator;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
-    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
-    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
-    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+    bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
     void notifyListener();
 
     void removeListener(Consumer::shared_ptr);
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
-     * must be held by caller */
-    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-
-    /** modify the Queue's message container - assumes messageLock held */
-    void pop(const sys::Mutex::ScopedLock& held);           // acquire front msg
-    void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
-    // acquire message @ position, return true and set msg if acquire succeeds
-    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                 const sys::Mutex::ScopedLock& held);
-
+    void enqueued(const QueuedMessage& msg);
+    void dequeued(const QueuedMessage& msg);
+    void pop();
+    void popAndDequeue();
+    QueuedMessage getFront();
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
-    void configureImpl(const qpid::framing::FieldTable& settings);
 
     inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
     {
@@ -182,9 +172,8 @@ class Queue : public boost::enable_share
             }
         }
     }
-
+            
     void checkNotDeleted();
-    void notifyDeleted();
 
   public:
 
@@ -193,50 +182,29 @@ class Queue : public boost::enable_share
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
-                             bool autodelete = false,
-                             MessageStore* const store = 0,
+                             bool autodelete = false, 
+                             MessageStore* const store = 0, 
                              const OwnershipToken* const owner = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
     QPID_BROKER_EXTERN ~Queue();
 
-    /** allow the Consumer to consume or browse the next available message */
     QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
 
-    /** allow the Consumer to acquire a message that it has browsed.
-     * @param msg - message to be acquired.
-     * @return false if message is no longer available for acquire.
-     */
-    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
-
-    /**
-     * Used to configure a new queue and create a persistent record
-     * for it in store if required.
-     */
-    QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+    void create(const qpid::framing::FieldTable& settings);
 
-    /**
-     * Used to reconfigure a recovered queue (does not create
-     * persistent record in store).
-     */
-    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
-    void destroyed();
+    // "recovering" means we are doing a MessageStore recovery.
+    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
+                                      bool recovering = false);
+    void destroy();
+    void notifyDeleted();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
                                   const qpid::framing::FieldTable& args);
-    //TODO: get unbind out of the public interface; only there for purposes of one unit test
-    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
-    /**
-     * Bind self to specified exchange, and record that binding for unbinding on delete.
-     */
-    bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
-              const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
+                                   Queue::shared_ptr shared_ref);
 
-    /** Acquire the message at the given position if it is available for acquire.  Not to
-     * be used by clients, but used by the broker for queue management.
-     * @param message - set to the acquired message if true returned.
-     * @return true if the message has been acquired.
-     */
+    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
     QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
 
     /**
@@ -265,14 +233,11 @@ class Queue : public boost::enable_share
                                     bool exclusive = false);
     QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
 
-    uint32_t purge(const uint32_t purge_request=0,  //defaults to all messages
-                   boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
-                   const ::qpid::types::Variant::Map *filter=0);
-    QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
+    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages 
+    QPID_BROKER_EXTERN void purgeExpired();
 
     //move qty # of messages to destination Queue destq
-    uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
-                  const qpid::types::Variant::Map *filter=0);
+    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
 
     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
     QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -311,8 +276,8 @@ class Queue : public boost::enable_share
      * Inform queue of messages that were enqueued, have since
      * been acquired but not yet accepted or released (and
      * thus are still logically on the queue) - used in
-     * clustered broker.
-     */
+     * clustered broker.  
+     */ 
     void updateEnqueued(const QueuedMessage& msg);
 
     /**
@@ -323,14 +288,14 @@ class Queue : public boost::enable_share
      * accepted it).
      */
     bool isEnqueued(const QueuedMessage& msg);
-
+            
     /**
-     * Acquires the next available (oldest) message
+     * Gets the next available message 
      */
     QPID_BROKER_EXTERN QueuedMessage get();
 
-    /** Get the message at position pos, returns true if found and sets msg */
-    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
+    /** Get the message at position pos */
+    QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
 
     const QueuePolicy* getPolicy();
 
@@ -344,13 +309,8 @@ class Queue : public boost::enable_share
     void encode(framing::Buffer& buffer) const;
     uint32_t encodedSize() const;
 
-    /**
-     * Restores a queue from encoded data (used in recovery)
-     *
-     * Note: restored queue will be neither auto-deleted or have an
-     * exclusive owner
-     */
-    static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
+    // "recovering" means we are doing a MessageStore recovery.
+    static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
     static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
 
     virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -359,7 +319,6 @@ class Queue : public boost::enable_share
     management::ManagementObject* GetManagementObject (void) const;
     management::Manageable::status_t
     ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
-    void query(::qpid::types::Variant::Map&) const;
 
     /** Apply f to each Message on the queue. */
     template <class F> void eachMessage(F f) {
@@ -372,11 +331,6 @@ class Queue : public boost::enable_share
         bindings.eachBinding(f);
     }
 
-    /** Apply f to each Observer on the queue */
-    template <class F> void eachObserver(F f) {
-        std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
-    }
-
     /** Set the position sequence number  for the next message on the queue.
      * Must be >= the current sequence number.
      * Used by cluster to replicate queues.
@@ -404,11 +358,6 @@ class Queue : public boost::enable_share
     void recoverPrepared(boost::intrusive_ptr<Message>& msg);
 
     void flush();
-
-    const Broker* getBroker();
-
-    uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
-    void setDequeueSincePurge(uint32_t value);
 };
 }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
 namespace qpid {
 namespace broker {
 
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
 
 QueueCleaner::~QueueCleaner()
 {
@@ -36,16 +36,10 @@ QueueCleaner::~QueueCleaner()
 
 void QueueCleaner::start(qpid::sys::Duration p)
 {
-    period = p;
     task = new Task(*this, p);
-    timer->add(task);
+    timer.add(task);
 }
 
-void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
-    this->timer = timer;
-}
-
-
 QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
 
 void QueueCleaner::Task::fire()
@@ -71,9 +65,9 @@ void QueueCleaner::fired()
     std::vector<Queue::shared_ptr> copy;
     CollectQueues collect(&copy);
     queues.eachQueue(collect);
-    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
+    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
     task->setupNextFire();
-    timer->add(task);
+    timer.add(task);
 }
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,15 +35,14 @@ class QueueRegistry;
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
     QPID_BROKER_EXTERN ~QueueCleaner();
-    QPID_BROKER_EXTERN void start(sys::Duration period);
-    QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
+    QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
   private:
     class Task : public sys::TimerTask
     {
       public:
-        Task(QueueCleaner& parent, sys::Duration duration);
+        Task(QueueCleaner& parent, qpid::sys::Duration duration);
         void fire();
       private:
         QueueCleaner& parent;
@@ -51,8 +50,7 @@ class QueueCleaner
 
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
-    sys::Timer* timer;
-    sys::Duration period;
+    sys::Timer& timer;
 
     void fired();
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp Fri Oct 21 01:19:00 2011
@@ -129,10 +129,6 @@ class EventGenerator : public QueueObser
     {
         if (!enqueueOnly) manager.dequeued(m);
     }
-
-    void acquired(const QueuedMessage&) {};
-    void requeued(const QueuedMessage&) {};
-
   private:
     QueueEvents& manager;
     const bool enqueueOnly;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp Fri Oct 21 01:19:00 2011
@@ -26,25 +26,19 @@ namespace broker {
 
 void QueueListeners::addListener(Consumer::shared_ptr c)
 {
-    if (!c->inListeners) {
-        if (c->acquires) {
-            add(consumers, c);
-        } else {
-            add(browsers, c);
-        }
-        c->inListeners = true;
+    if (c->preAcquires()) {
+        add(consumers, c);
+    } else {
+        add(browsers, c);
     }
 }
 
 void QueueListeners::removeListener(Consumer::shared_ptr c)
 {
-    if (c->inListeners) {
-        if (c->acquires) {
-            remove(consumers, c);
-        } else {
-            remove(browsers, c);
-        }
-        c->inListeners = false;
+    if (c->preAcquires()) {
+        remove(consumers, c);
+    } else {
+        remove(browsers, c);
     }
 }
 
@@ -52,20 +46,18 @@ void QueueListeners::populate(Notificati
 {
     if (consumers.size()) {
         set.consumer = consumers.front();
-        consumers.pop_front();
-        set.consumer->inListeners = false;
+        consumers.erase(consumers.begin());
     } else {
-        // Don't swap the deques, hang on to the memory allocated.
+        // Don't swap the vectors, hang on to the memory allocated.
         set.browsers = browsers;
         browsers.clear();
-        for (Listeners::iterator i = set.browsers.begin(); i != set.browsers.end(); i++)
-            (*i)->inListeners = false;
     }
 }
 
 void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
 {
-    listeners.push_back(c);
+    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+    if (i == listeners.end()) listeners.push_back(c);
 }
 
 void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
@@ -81,7 +73,9 @@ void QueueListeners::NotificationSet::no
 }
 
 bool QueueListeners::contains(Consumer::shared_ptr c) const {
-    return c->inListeners;
+    return
+        std::find(browsers.begin(), browsers.end(), c) != browsers.end() ||
+        std::find(consumers.begin(), consumers.end(), c) != consumers.end();
 }
 
 void QueueListeners::ListenerSet::notifyAll()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h Fri Oct 21 01:19:00 2011
@@ -22,7 +22,7 @@
  *
  */
 #include "qpid/broker/Consumer.h"
-#include <deque>
+#include <vector>
 
 namespace qpid {
 namespace broker {
@@ -40,7 +40,7 @@ namespace broker {
 class QueueListeners
 {
   public:
-    typedef std::deque<Consumer::shared_ptr> Listeners;
+    typedef std::vector<Consumer::shared_ptr> Listeners;
 
     class NotificationSet
     {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h Fri Oct 21 01:19:00 2011
@@ -24,52 +24,18 @@
 namespace qpid {
 namespace broker {
 
-struct QueuedMessage;
-class Consumer;
-
+class QueuedMessage;
 /**
- * Interface for notifying classes who want to act as 'observers' of a queue of particular
- * events.
- *
- * The events that are monitored reflect the relationship between a particular message and
- * the queue it has been delivered to.  A message can be considered in one of three states
- * with respect to the queue:
- *
- * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
- *
- * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
- * (by either browse or acquire), but still considered on the queue.
- *
- * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
- *
- * The queue events that are observable are:
- *
- * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
- * (e.g. browse or acquire)
- *
- * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
- * for other consumers to browse or acquire, but it is not yet considered dequeued as it
- * may be requeued by the consumer.
- *
- * "Requeued" - a previously-acquired message is released by its owner: it is put back on
- * the queue at its original position and returns to the "Available" state.
- *
- * "Dequeued" - a message is no longer queued.  At this point, the queue no longer tracks
- * the message, and the broker considers the consumer's transaction complete.
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
  */
 class QueueObserver
 {
   public:
     virtual ~QueueObserver() {}
-
-    // note: the Queue will hold the messageLock while calling these methods!
     virtual void enqueued(const QueuedMessage&) = 0;
     virtual void dequeued(const QueuedMessage&) = 0;
-    virtual void acquired(const QueuedMessage&) = 0;
-    virtual void requeued(const QueuedMessage&) = 0;
-    virtual void consumerAdded( const Consumer& ) {};
-    virtual void consumerRemoved( const Consumer& ) {};
- private:
+  private:
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp Fri Oct 21 01:19:00 2011
@@ -117,30 +117,30 @@ void QueuePolicy::update(FieldTable& set
     settings.setString(typeKey, type);
 }
 
-template <typename T>
-T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
+uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
 {
     FieldTable::ValuePtr v = settings.get(key);
 
-    T result = 0;
+    int32_t result = 0;
 
     if (!v) return defaultValue;
     if (v->getType() == 0x23) {
         QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
     } else if (v->getType() == 0x33) {
         QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
-    } else if (v->convertsTo<T>()) {
-        result = v->get<T>();
+    } else if (v->convertsTo<int>()) {
+        result = v->get<int>();
         QPID_LOG(debug, "Got integer value for " << key << ": " << result);
         if (result >= 0) return result;
     } else if (v->convertsTo<string>()) {
         string s(v->get<string>());
         QPID_LOG(debug, "Got string value for " << key << ": " << s);
         std::istringstream convert(s);
-        if (convert >> result && result >= 0 && convert.eof()) return result;
+        if (convert >> result && result >= 0) return result;
     }
 
-    throw IllegalArgumentException(QPID_MSG("Cannot convert " << key << " to unsigned integer: " << *v));
+    QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
+    return defaultValue;
 }
 
 std::string QueuePolicy::getType(const FieldTable& settings)
@@ -247,7 +247,7 @@ bool RingQueuePolicy::checkLimit(boost::
 {
 
     // If the message is bigger than the queue size, give up
-    if (getMaxSize() && m->contentSize() > getMaxSize()) {
+    if (m->contentSize() > getMaxSize()) {
         QPID_LOG(debug, "Message too large for ring queue " << name 
                  << " [" << *this  << "] "
                  << ": message size = " << m->contentSize() << " bytes"
@@ -269,7 +269,8 @@ bool RingQueuePolicy::checkLimit(boost::
 
     do {
         QueuedMessage oldest  = queue.front();
-        if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
+
+        if (oldest.queue->acquire(oldest) || !strict) {
             queue.pop_front();
             pendingDequeues.push_back(oldest);
             QPID_LOG(debug, "Ring policy triggered in " << name 
@@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
 
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
 {
-    uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
-    uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
+    uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
+    uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
     if (maxCount || maxSize) {
         return createQueuePolicy(name, maxCount, maxSize, getType(settings));
     } else {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h Fri Oct 21 01:19:00 2011
@@ -43,7 +43,8 @@ class QueuePolicy
     uint32_t count;
     uint64_t size;
     bool policyExceeded;
-
+            
+    static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
 
   protected:
     uint64_t getCurrentQueueSize() const { return size; } 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp Fri Oct 21 01:19:00 2011
@@ -21,7 +21,6 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueEvents.h"
-#include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>
@@ -37,13 +36,7 @@ QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
 QueueRegistry::declare(const string& declareName, bool durable, 
-                       bool autoDelete, const OwnershipToken* owner,
-                       boost::shared_ptr<Exchange> alternate,
-                       const qpid::framing::FieldTable& arguments,
-                       bool recovering/*true if this declare is a
-                                        result of recovering queue
-                                        definition from persistente
-                                        record*/)
+                       bool autoDelete, const OwnershipToken* owner)
 {
     RWlock::ScopedWlock locker(lock);
     string name = declareName.empty() ? generateName() : declareName;
@@ -52,17 +45,6 @@ QueueRegistry::declare(const string& dec
 
     if (i == queues.end()) {
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
-        if (alternate) {
-            queue->setAlternateExchange(alternate);//need to do this *before* create
-            alternate->incAlternateUsers();
-        }
-        if (!recovering) {
-            //apply settings & create persistent record if required
-            queue->create(arguments);
-        } else {
-            //i.e. recovering a queue for which we already have a persistent record
-            queue->configure(arguments);
-        }
         queues[name] = queue;
         if (lastNode) queue->setLastNodeFailure();
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
-#include "qpid/framing/FieldTable.h"
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
 #include <algorithm>
@@ -35,7 +34,6 @@ namespace broker {
 
 class Queue;
 class QueueEvents;
-class Exchange;
 class OwnershipToken;
 class Broker;
 class MessageStore;
@@ -62,10 +60,7 @@ class QueueRegistry {
         const std::string& name,
         bool durable = false,
         bool autodelete = false, 
-        const OwnershipToken* owner = 0,
-        boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
-        const qpid::framing::FieldTable& args = framing::FieldTable(),
-        bool recovering = false);
+        const OwnershipToken* owner = 0);
 
     /**
      * Destroy the named queue.

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp Fri Oct 21 01:19:00 2011
@@ -43,6 +43,7 @@ void RecoveredDequeue::commit() throw()
 
 void RecoveredDequeue::rollback() throw()
 {
+    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp Fri Oct 21 01:19:00 2011
@@ -36,6 +36,7 @@ bool RecoveredEnqueue::prepare(Transacti
 }
 
 void RecoveredEnqueue::commit() throw(){
+    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Oct 21 01:19:00 2011
@@ -113,7 +113,7 @@ RecoverableExchange::shared_ptr Recovery
 
 RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
 {
-    Queue::shared_ptr queue = Queue::restore(queues, buffer);
+    Queue::shared_ptr queue = Queue::decode(queues, buffer, true);
     try {
         Exchange::shared_ptr exchange = exchanges.getDefault();
         if (exchange) {
@@ -252,6 +252,7 @@ void RecoverableMessageImpl::dequeue(Dtx
 
 void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
 {
+    msg->enqueueComplete(); // recoved nmessage to enqueued in store already
     buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp Fri Oct 21 01:19:00 2011
@@ -30,7 +30,6 @@
 #include <boost/format.hpp>
 
 #if HAVE_SASL
-#include <sys/stat.h>
 #include <sasl/sasl.h>
 #include "qpid/sys/cyrus/CyrusSecurityLayer.h"
 using qpid::sys::cyrus::CyrusSecurityLayer;
@@ -58,7 +57,7 @@ public:
     NullAuthenticator(Connection& connection, bool encrypt);
     ~NullAuthenticator();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string* response);
+    void start(const std::string& mechanism, const std::string& response);
     void step(const std::string&) {}
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
 };
@@ -82,7 +81,7 @@ public:
     ~CyrusAuthenticator();
     void init();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string* response);
+    void start(const std::string& mechanism, const std::string& response);
     void step(const std::string& response);
     void getError(std::string& error);
     void getUid(std::string& uid) { getUsername(uid); }
@@ -99,33 +98,11 @@ void SaslAuthenticator::init(const std::
     //  Check if we have a version of SASL that supports sasl_set_path()
 #if (SASL_VERSION_FULL >= ((2<<16)|(1<<8)|22))
     //  If we are not given a sasl path, do nothing and allow the default to be used.
-    if ( saslConfigPath.empty() ) {
-        QPID_LOG ( info, "SASL: no config path set - using default." );
-    }
-    else {
-        struct stat st;
-
-        // Make sure the directory exists and we can read up to it.
-        if ( ::stat ( saslConfigPath.c_str(), & st) ) {
-          // Note: not using strerror() here because I think its messages are a little too hazy.
-          if ( errno == ENOENT )
-              throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: no such directory: " << saslConfigPath ) );
-          if ( errno == EACCES )
-              throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot read parent of: " << saslConfigPath ) );
-          // catch-all stat failure
-          throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot stat: " << saslConfigPath ) );
-        }
-
-        // Make sure the directory is readable.
-        if ( ::access ( saslConfigPath.c_str(), R_OK ) ) {
-            throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: directory not readable:" << saslConfigPath ) );
-        }
-
-        // This shouldn't fail now, but check anyway.
-        int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, const_cast<char *>(saslConfigPath.c_str()));
+    if ( ! saslConfigPath.empty() ) {
+        int code = sasl_set_path(SASL_PATH_TYPE_CONFIG,
+                                 const_cast<char *>(saslConfigPath.c_str()));
         if(SASL_OK != code)
             throw Exception(QPID_MSG("SASL: sasl_set_path failed [" << code << "] " ));
-
         QPID_LOG(info, "SASL: config path set to " << saslConfigPath );
     }
 #endif
@@ -187,7 +164,7 @@ void NullAuthenticator::getMechanisms(Ar
     mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
 }
 
-void NullAuthenticator::start(const string& mechanism, const string* response)
+void NullAuthenticator::start(const string& mechanism, const string& response)
 {
     if (encrypt) {
 #if HAVE_SASL
@@ -203,16 +180,16 @@ void NullAuthenticator::start(const stri
         }
     }
     if (mechanism == "PLAIN") { // Old behavior
-        if (response && response->size() > 0) {
+        if (response.size() > 0) {
             string uid;
-            string::size_type i = response->find((char)0);
-            if (i == 0 && response->size() > 1) {
+            string::size_type i = response.find((char)0);
+            if (i == 0 && response.size() > 1) {
                 //no authorization id; use authentication id
-                i = response->find((char)0, 1);
-                if (i != string::npos) uid = response->substr(1, i-1);
+                i = response.find((char)0, 1);
+                if (i != string::npos) uid = response.substr(1, i-1);
             } else if (i != string::npos) {
                 //authorization id is first null delimited field
-                uid = response->substr(0, i);
+                uid = response.substr(0, i);
             }//else not a valid SASL PLAIN response, throw error?            
             if (!uid.empty()) {
                 //append realm if it has not already been added
@@ -399,22 +376,18 @@ void CyrusAuthenticator::getMechanisms(A
     }
 }
 
-void CyrusAuthenticator::start(const string& mechanism, const string* response)
+void CyrusAuthenticator::start(const string& mechanism, const string& response)
 {
     const char *challenge;
     unsigned int challenge_len;
     
-    // This should be at same debug level as mech list in getMechanisms().
-    QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
+    QPID_LOG(debug, "SASL: Starting authentication with mechanism: " << mechanism);
     int code = sasl_server_start(sasl_conn,
                                  mechanism.c_str(),
-                                 (response ? response->c_str() : 0), (response ? response->size() : 0),
+                                 response.c_str(), response.length(),
                                  &challenge, &challenge_len);
     
     processAuthenticationStep(code, challenge, challenge_len);
-    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
-    if ( cnxMgmt ) 
-        cnxMgmt->set_saslMechanism(mechanism);
 }
         
 void CyrusAuthenticator::step(const string& response)
@@ -451,12 +424,10 @@ void CyrusAuthenticator::processAuthenti
         client.secure(challenge_str);
     } else {
         std::string uid;
-        //save error detail before trying to retrieve username as error in doing so will overwrite it
-        std::string errordetail = sasl_errdetail(sasl_conn);
         if (!getUsername(uid)) {
-            QPID_LOG(info, "SASL: Authentication failed (no username available yet):" << errordetail);
+            QPID_LOG(info, "SASL: Authentication failed (no username available):" << sasl_errdetail(sasl_conn));
         } else {
-            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << errordetail);
+            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << sasl_errdetail(sasl_conn));
         }
 
         // TODO: Change to more specific exceptions, when they are
@@ -488,9 +459,6 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
     if (ssf) {
         securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
     }
-    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
-    if ( cnxMgmt ) 
-        cnxMgmt->set_saslSsf(ssf);
     return securityLayer;
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h Fri Oct 21 01:19:00 2011
@@ -41,7 +41,7 @@ class SaslAuthenticator
 public:
     virtual ~SaslAuthenticator() {}
     virtual void getMechanisms(framing::Array& mechanisms) = 0;
-    virtual void start(const std::string& mechanism, const std::string* response) = 0;
+    virtual void start(const std::string& mechanism, const std::string& response) = 0;
     virtual void step(const std::string& response) = 0;
     virtual void getUid(std::string&) {}
     virtual bool getUsername(std::string&) { return false; };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org