You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [4/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Aug 10 12:04:27 2012
@@ -20,23 +20,23 @@
  */
 
 #include "qpid/broker/Queue.h"
-
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/broker/QueueDepth.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Exchange.h"
-#include "qpid/broker/Fairshare.h"
 #include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/LegacyLVQ.h"
-#include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/MessageMap.h"
 #include "qpid/broker/MessageStore.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageDistributor.h"
+#include "qpid/broker/FifoDistributor.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueFlowLimit.h"
-#include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/MessageGroupManager.h"
 
+//TODO: get rid of this
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
@@ -76,26 +76,8 @@ namespace _qmf = qmf::org::apache::qpid:
 
 namespace
 {
-const std::string qpidMaxSize("qpid.max_size");
-const std::string qpidMaxCount("qpid.max_count");
-const std::string qpidNoLocal("no-local");
-const std::string qpidTraceIdentity("qpid.trace.id");
-const std::string qpidTraceExclude("qpid.trace.exclude");
-const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
-const std::string qpidLastValueQueue("qpid.last_value_queue");
-const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
-const std::string qpidPersistLastNode("qpid.persist_last_node");
-const std::string qpidVQMatchProperty("qpid.LVQ_key");
-const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
-const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
-//following feature is not ready for general use as it doesn't handle
-//the case where a message is enqueued on more than one queue well enough:
-const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
-
-const int ENQUEUE_ONLY=1;
-const int ENQUEUE_AND_DEQUEUE=2;
 
-inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+inline void mgntEnqStats(const Message& msg,
 			 _qmf::Queue* mgmtObject,
 			 _qmf::Broker* brokerMgmtObject)
 {
@@ -103,12 +85,12 @@ inline void mgntEnqStats(const boost::in
         _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
         _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
 
-        uint64_t contentSize = msg->contentSize();
+        uint64_t contentSize = msg.getContentSize();
         qStats->msgTotalEnqueues +=1;
         bStats->msgTotalEnqueues += 1;
         qStats->byteTotalEnqueues += contentSize;
         bStats->byteTotalEnqueues += contentSize;
-        if (msg->isPersistent ()) {
+        if (msg.isPersistent ()) {
             qStats->msgPersistEnqueues += 1;
             bStats->msgPersistEnqueues += 1;
             qStats->bytePersistEnqueues += contentSize;
@@ -119,20 +101,20 @@ inline void mgntEnqStats(const boost::in
     }
 }
 
-inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+inline void mgntDeqStats(const Message& msg,
 			 _qmf::Queue* mgmtObject,
 			 _qmf::Broker* brokerMgmtObject)
 {
     if (mgmtObject != 0){
         _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
         _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
-        uint64_t contentSize = msg->contentSize();
+        uint64_t contentSize = msg.getContentSize();
 
         qStats->msgTotalDequeues += 1;
         bStats->msgTotalDequeues += 1;
         qStats->byteTotalDequeues += contentSize;
         bStats->byteTotalDequeues += contentSize;
-        if (msg->isPersistent ()){
+        if (msg.isPersistent ()){
             qStats->msgPersistDequeues += 1;
             bStats->msgPersistDequeues += 1;
             qStats->bytePersistDequeues += contentSize;
@@ -143,43 +125,81 @@ inline void mgntDeqStats(const boost::in
     }
 }
 
-} // namespace
+QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
+{
+    QueueSettings settings(inputs);
+    if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
+        settings.maxDepth.setSize(globalOptions.queueLimit);
+    }
+    return settings;
+}
+
+}
+
+Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
+bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
+{
+    try {
+        prepared = queue->enqueue(ctxt, message);
+        return true;
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to prepare: " << e.what());
+        return false;
+    }
+}
+void Queue::TxPublish::commit() throw()
+{
+    try {
+        if (prepared) queue->process(message);
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to commit: " << e.what());
+    }
+}
+void Queue::TxPublish::rollback() throw()
+{
+    try {
+        if (prepared) queue->enqueueAborted(message);
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to rollback: " << e.what());
+    }
+}
 
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, const QueueSettings& _settings,
              MessageStore* const _store,
-             const OwnershipToken* const _owner,
              Manageable* parent,
              Broker* b) :
 
     name(_name),
-    autodelete(_autodelete),
     store(_store),
-    owner(_owner),
+    owner(0),
     consumerCount(0),
     browserCount(0),
     exclusive(0),
-    noLocal(false),
     persistLastNode(false),
     inLastNodeFailure(false),
     messages(new MessageDeque()),
     persistenceId(0),
-    policyExceeded(false),
+    settings(b ? merge(_settings, b->getOptions()) : _settings),
     mgmtObject(0),
     brokerMgmtObject(0),
     eventMode(0),
-    insertSeqNo(0),
     broker(b),
     deleted(false),
     barrier(*this),
-    autoDeleteTimeout(0),
     allocator(new FifoDistributor( *messages ))
 {
+    if (settings.maxDepth.hasCount()) current.setCount(0);
+    if (settings.maxDepth.hasSize()) current.setSize(0);
+    if (settings.traceExcludes.size()) {
+        split(traceExclude, settings.traceExcludes, ", ");
+    }
+    qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
 
         if (agent != 0) {
-            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
-            mgmtObject->set_exclusive(_owner != 0);
+            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+            mgmtObject->set_arguments(settings.asMap());
             agent->addObject(mgmtObject, 0, store != 0);
             brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
             if (brokerMgmtObject)
@@ -197,32 +217,36 @@ Queue::~Queue()
     }
 }
 
-bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+bool isLocalTo(const OwnershipToken* token, const Message& msg)
 {
-    return token && token->isLocal(msg->getPublisher());
+    return token && token->isLocal(msg.getPublisher());
 }
 
-bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+bool Queue::isLocal(const Message& msg)
 {
     //message is considered local if it was published on the same
     //connection as that of the session which declared this queue
     //exclusive (owner) or which has an exclusive subscription
     //(exclusive)
-    return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+    return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
 }
 
-bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+bool Queue::isExcluded(const Message& msg)
 {
-    return traceExclude.size() && msg->isExcluded(traceExclude);
+    return traceExclude.size() && msg.isExcluded(traceExclude);
 }
 
-void Queue::deliver(boost::intrusive_ptr<Message> msg){
+void Queue::deliver(Message msg, TxBuffer* txn){
+    //TODO: move some of this out of the queue and into the publishing
+    //'link' for whatever protocol is used; that would let protocol
+    //specific stuff be kept out the queue
+
     // Check for deferred delivery in a cluster.
     if (broker && broker->deferDelivery(name, msg))
         return;
-    if (msg->isImmediate() && getConsumerCount() == 0) {
+    if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
         if (alternateExchange) {
-            DeliverableMessage deliverable(msg);
+            DeliverableMessage deliverable(msg, 0);
             alternateExchange->route(deliverable);
         }
     } else if (isLocal(msg)) {
@@ -232,47 +256,38 @@ void Queue::deliver(boost::intrusive_ptr
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
-        enqueue(0, msg);
-        push(msg);
-        QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+        if (txn) {
+            TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
+            txn->enlist(op);
+        } else {
+            if (enqueue(0, msg)) {
+                push(msg);
+                QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+            } else {
+                QPID_LOG(debug, "Message " << msg << " dropped from " << name);
+            }
+        }
     }
 }
 
-void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
+void Queue::recoverPrepared(const Message& msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->recoverEnqueued(msg);
+    current += QueueDepth(1, msg.getContentSize());
 }
 
-void Queue::recover(boost::intrusive_ptr<Message>& msg)
+void Queue::recover(Message& msg)
 {
-    {
-        Mutex::ScopedLock locker(messageLock);
-        if (policy.get()) policy->recoverEnqueued(msg);
-    }
-
+    recoverPrepared(msg);
     push(msg, true);
-    if (store){
-        // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
-        msg->addToSyncList(shared_from_this(), store);
-    }
-
-    if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
-        //content has not been loaded, need to ensure that lazy loading mode is set:
-        //TODO: find a nicer way to do this
-        msg->releaseContent(store);
-        // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the
-        // presence of this message). Do not change this without also checking these tests.
-        QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                        std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery");
-    }
 }
 
-void Queue::process(boost::intrusive_ptr<Message>& msg){
+void Queue::process(Message& msg)
+{
     push(msg);
     if (mgmtObject != 0){
         _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
-        const uint64_t contentSize = msg->contentSize();
+        const uint64_t contentSize = msg.getContentSize();
         qStats->msgTxnEnqueues  += 1;
         qStats->byteTxnEnqueues += contentSize;
         mgmtObject->statisticsUpdated();
@@ -285,46 +300,22 @@ void Queue::process(boost::intrusive_ptr
     }
 }
 
-void Queue::requeue(const QueuedMessage& msg){
+void Queue::release(const QueueCursor& position, bool markRedelivered)
+{
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
     {
-        if (!isEnqueued(msg)) return;
-        if (deleted) {
-            //
-            // If the queue has been deleted, requeued messages must be sent to the alternate exchange
-            // if one is configured.
-            //
-            if (alternateExchange.get()) {
-                DeliverableMessage dmsg(msg.payload);
-                alternateExchange->routeWithAlternate(dmsg);
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandonedViaAlt();
-            } else {
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandoned();
-            }
-            mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
-        } else {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                messages->release(msg);
-                observeRequeue(msg, locker);
+        Mutex::ScopedLock locker(messageLock);
+        if (!deleted) {
+            Message* message = messages->release(position);
+            if (message) {
+                if (!markRedelivered) message->undeliver();
                 listeners.populate(copy);
-            }
-
-            if (mgmtObject) {
-                mgmtObject->inc_releases();
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_releases();
-            }
-
-            // for persistLastNode - don't force a message twice to disk, but force it if no force before
-            if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
-                msg.payload->forcePersistent();
-                if (msg.payload->isForcedPersistent() ){
-                    boost::intrusive_ptr<Message> payload = msg.payload;
-                    enqueue(0, payload);
+                observeRequeue(*message, locker);
+                if (mgmtObject) {
+                    mgmtObject->inc_releases();
+                    if (brokerMgmtObject)
+                        brokerMgmtObject->inc_releases();
                 }
             }
         }
@@ -332,163 +323,118 @@ void Queue::requeue(const QueuedMessage&
     copy.notify();
 }
 
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::dequeueMessageAt(const SequenceNumber& position)
 {
-    assertClusterSafe();
-    QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (acquire(position, message)) {
-        QPID_LOG(debug, "Acquired message at " << position << " from " << name);
-        return true;
-    } else {
-        QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
-        return false;
-    }
-}
-
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
-{
-    assertClusterSafe();
-    QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-    bool ok;
+    boost::intrusive_ptr<PersistableMessage> pmsg;
     {
         Mutex::ScopedLock locker(messageLock);
-        ok = allocator->allocate( consumer, msg );
-    }
-    if (!ok) {
-        QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
-        return false;
-    }
-
-    QueuedMessage copy(msg);
-    if (acquire( msg.position, copy)) {
-        QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
-        return true;
+        assertClusterSafe();
+        QPID_LOG(debug, "Attempting to dequeue message at " << position);
+        QueueCursor cursor;
+        Message* msg = messages->find(position, &cursor);
+        if (msg) {
+            if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+            observeDequeue(*msg, locker);
+            messages->deleted(cursor);
+        } else {
+            QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
+            return false;
+        }
     }
-    QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
-    return false;
+    dequeueFromStore(pmsg);
+    return true;
 }
 
-void Queue::notifyListener()
+bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
 {
+    Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
-    QueueListeners::NotificationSet set;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        if (messages->size()) {
-            listeners.populate(set);
-        }
-    }
-    set.notify();
-}
+    Message* msg;
 
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
-    checkNotDeleted(c);
-    if (c->preAcquires()) {
-        switch (consumeNextMessage(m, c)) {
-          case CONSUMED:
-            return true;
-          case CANT_CONSUME:
-            notifyListener();//let someone else try
-          case NO_MESSAGES:
-          default:
+    msg = messages->find(position);
+    if (msg) {
+        QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence());
+        if (!allocator->acquire(consumer, *msg)) {
+            QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name);
             return false;
+        } else {
+            observeAcquire(*msg, locker);
+            QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name);
+            return true;
         }
     } else {
-        return browseNextMessage(m, c);
+        QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name);
+        return false;
     }
 }
 
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
 {
+    checkNotDeleted(c);
+    QueueListeners::NotificationSet set;
     while (true) {
-        QueuedMessage msg;
-        bool found;
-        {
-            Mutex::ScopedLock locker(messageLock);
-            found = allocator->nextConsumableMessage(c, msg);
-            if (!found) listeners.addListener(c);
-        }
-        if (!found) {
-            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
-            return NO_MESSAGES;
-        }
-
-        if (msg.payload->hasExpired()) {
-            QPID_LOG(debug, "Message expired from queue '" << name << "'");
-            c->setPosition(msg.position);
-            dequeue(0, msg);
-            if (mgmtObject) {
-                mgmtObject->inc_discardsTtl();
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_discardsTtl();
-            }
-            continue;
-        }
-
-        if (c->filter(msg.payload)) {
-            if (c->accept(msg.payload)) {
-                {
-                    Mutex::ScopedLock locker(messageLock);
-                    bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
-                    (void) ok; assert(ok);
-                    observeAcquire(msg, locker);
-                }
+        //TODO: reduce lock scope
+        Mutex::ScopedLock locker(messageLock);
+        Message* msg = messages->next(*c);
+        if (msg) {
+            if (msg->hasExpired()) {
+                QPID_LOG(debug, "Message expired from queue '" << name << "'");
+                observeDequeue(*msg, locker);
+                //ERROR: don't hold lock across call to store!!
+                if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
                 if (mgmtObject) {
-                    mgmtObject->inc_acquires();
+                    mgmtObject->inc_discardsTtl();
                     if (brokerMgmtObject)
-                        brokerMgmtObject->inc_acquires();
+                        brokerMgmtObject->inc_discardsTtl();
                 }
-                m = msg;
-                return CONSUMED;
-            } else {
-                //message(s) are available but consumer hasn't got enough credit
-                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                messages->deleted(*c);
+                continue;
             }
-        } else {
-            //consumer will never want this message
-            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-        }
-
-        Mutex::ScopedLock locker(messageLock);
-        messages->release(msg);
-        return CANT_CONSUME;
-    }
-}
 
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
-    while (true) {
-        QueuedMessage msg;
-        bool found;
-        {
-            Mutex::ScopedLock locker(messageLock);
-            found = allocator->nextBrowsableMessage(c, msg);
-            if (!found) listeners.addListener(c);
-        }
-        if (!found) { // no next available
-            QPID_LOG(debug, "No browsable messages available for consumer " <<
-                     c->getName() << " on queue '" << name << "'");
-            return false;
-        }
-
-        if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
-            if (c->accept(msg.payload)) {
-                //consumer wants the message
-                c->setPosition(msg.position);
-                m = msg;
-                return true;
+            if (c->filter(*msg)) {
+                if (c->accept(*msg)) {
+                    if (c->preAcquires()) {
+                        QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
+                        if (allocator->acquire(c->getName(), *msg)) {
+                            if (mgmtObject) {
+                                mgmtObject->inc_acquires();
+                                if (brokerMgmtObject)
+                                    brokerMgmtObject->inc_acquires();
+                            }
+                            observeAcquire(*msg, locker);
+                            msg->deliver();
+                        } else {
+                            QPID_LOG(debug, "Could not acquire message from '" << name << "'");
+                            continue; //try another message
+                        }
+                    }
+                    QPID_LOG(debug, "Message retrieved from '" << name << "'");
+                    m = *msg;
+                    return true;
+                } else {
+                    //message(s) are available but consumer hasn't got enough credit
+                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                    if (c->preAcquires()) {
+                        //let someone else try
+                        listeners.populate(set);
+                    }
+                    break;
+                }
             } else {
-                //browser hasn't got enough credit for the message
-                QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
-                return false;
+                //consumer will never want this message, try another one
+                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+                if (c->preAcquires()) {
+                    //let someone else try to take this one
+                    listeners.populate(set);
+                }
             }
         } else {
-            //consumer will never want this message, continue seeking
-            QPID_LOG(debug, "Browser skipping message from '" << name << "'");
-            c->setPosition(msg.position);
+            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+            listeners.addListener(c);
+            return false;
         }
     }
+    set.notify();
     return false;
 }
 
@@ -507,23 +453,28 @@ void Queue::removeListener(Consumer::sha
 
 bool Queue::dispatch(Consumer::shared_ptr c)
 {
-    QueuedMessage msg(this);
+    Message msg;
     if (getNextMessage(msg, c)) {
-        c->deliver(msg);
+        c->deliver(*c, msg);
         return true;
     } else {
         return false;
     }
 }
 
-bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
+bool Queue::find(SequenceNumber pos, Message& msg) const
+{
     Mutex::ScopedLock locker(messageLock);
-    if (messages->find(pos, msg))
+    Message* ptr = messages->find(pos, 0);
+    if (ptr) {
+        msg = *ptr;
         return true;
+    }
     return false;
 }
 
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
+{
     assertClusterSafe();
     {
         Mutex::ScopedLock locker(messageLock);
@@ -550,7 +501,7 @@ void Queue::consume(Consumer::shared_ptr
             browserCount++;
         consumerCount++;
         //reset auto deletion timer if necessary
-        if (autoDeleteTimeout && autoDeleteTask) {
+        if (settings.autoDeleteDelay && autoDeleteTask) {
             autoDeleteTask->cancel();
         }
         observeConsumerAdd(*c, locker);
@@ -559,7 +510,8 @@ void Queue::consume(Consumer::shared_ptr
         mgmtObject->inc_consumerCount ();
 }
 
-void Queue::cancel(Consumer::shared_ptr c){
+void Queue::cancel(Consumer::shared_ptr c)
+{
     removeListener(c);
     {
         Mutex::ScopedLock locker(messageLock);
@@ -572,65 +524,6 @@ void Queue::cancel(Consumer::shared_ptr 
         mgmtObject->dec_consumerCount ();
 }
 
-QueuedMessage Queue::get(){
-    QueuedMessage msg(this);
-    bool ok;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        ok = messages->consume(msg);
-        if (ok) observeAcquire(msg, locker);
-    }
-
-    if (ok && mgmtObject) {
-        mgmtObject->inc_acquires();
-        if (brokerMgmtObject)
-            brokerMgmtObject->inc_acquires();
-    }
-
-    return msg;
-}
-
-namespace {
-bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
-               std::deque<QueuedMessage>& collection)
-{
-    if (predicate(qm)) {
-        collection.push_back(qm);
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
-} // namespace
-
-void Queue::dequeueIf(Messages::Predicate predicate,
-                      std::deque<QueuedMessage>& dequeued)
-{
-    {
-        Mutex::ScopedLock locker(messageLock);
-        messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
-    }
-    if (!dequeued.empty()) {
-        if (mgmtObject) {
-            mgmtObject->inc_acquires(dequeued.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_acquires(dequeued.size());
-        }
-        for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
-             i != dequeued.end(); ++i) {
-            {
-                // KAG: should be safe to retake lock after the removeIf, since
-                // no other thread can touch these messages after the removeIf() call
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*i, locker);
-            }
-            dequeue( 0, *i );
-        }
-    }
-}
-
 /**
  *@param lapse: time since the last purgeExpired
  */
@@ -642,13 +535,17 @@ void Queue::purgeExpired(sys::Duration l
     dequeueSincePurge -= count;
     int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
     if (seconds == 0 || count / seconds < 1) {
-        std::deque<QueuedMessage> dequeued;
-        dequeueIf(boost::bind(&isExpired, _1), dequeued);
-        if (dequeued.size()) {
-            if (mgmtObject) {
-                mgmtObject->inc_discardsTtl(dequeued.size());
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_discardsTtl(dequeued.size());
+        uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
+        QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
+        //
+        // Report the count of discarded-by-ttl messages
+        //
+        if (mgmtObject && count) {
+            mgmtObject->inc_acquires(count);
+            mgmtObject->inc_discardsTtl(count);
+            if (brokerMgmtObject) {
+                brokerMgmtObject->inc_acquires(count);
+                brokerMgmtObject->inc_discardsTtl(count);
             }
         }
     }
@@ -663,7 +560,7 @@ namespace {
         static const std::string typeKey;
         static const std::string paramsKey;
         static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
-        virtual bool match( const QueuedMessage& ) const { return true; }
+        virtual bool match( const Message& ) const { return true; }
         virtual ~MessageFilter() {}
     protected:
         MessageFilter() {};
@@ -687,13 +584,9 @@ namespace {
         static const std::string valueKey;
         HeaderMatchFilter( const std::string& _header, const std::string& _value )
             : MessageFilter (), header(_header), value(_value) {}
-        bool match( const QueuedMessage& msg ) const
+        bool match( const Message& msg ) const
         {
-            const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
-            if (!headers) return false;
-            FieldTable::ValuePtr h = headers->get(header);
-            if (!h || !h->convertsTo<std::string>()) return false;
-            return h->get<std::string>() == value;
+            return msg.getPropertyAsString(header) == value;
         }
     private:
         const std::string header;
@@ -730,36 +623,68 @@ namespace {
         return new MessageFilter();
     }
 
-    // used by removeIf() to collect all messages matching a filter, maximum match count is
-    // optional.
-    struct Collector {
-        const uint32_t maxMatches;
-        MessageFilter& filter;
-        std::deque<QueuedMessage> matches;
-        Collector(MessageFilter& filter, uint32_t max)
-            : maxMatches(max), filter(filter) {}
-        bool operator() (QueuedMessage& qm)
-        {
-            if (maxMatches == 0 || matches.size() < maxMatches) {
-                if (filter.match( qm )) {
-                    matches.push_back(qm);
-                    return true;
-                }
-            }
+    bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
+    {
+        if (e) {
+            DeliverableMessage d(m, 0);
+            d.getMessage().clearTrace();
+            e->routeWithAlternate(d);
+            return true;
+        } else {
             return false;
         }
-    };
-
+    }
+    void moveTo(boost::shared_ptr<Queue> q, Message& m)
+    {
+        if (q) {
+            q->deliver(m);
+        }
+    }
 } // end namespace
 
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
+{
+    std::deque<Message> removed;
+    {
+        QueueCursor c(type);
+        uint32_t count(0);
+        Mutex::ScopedLock locker(messageLock);
+        Message* m = messages->next(c);
+        while (m){
+            if (!p || p(*m)) {
+                if (!maxCount || count++ < maxCount) {
+                    if (m->getState() == AVAILABLE) {
+                        //don't actually acquire, just act as if we did
+                        observeAcquire(*m, locker);
+                    }
+                    observeDequeue(*m, locker);
+                    removed.push_back(*m);//takes a copy of the message
+                    if (!messages->deleted(c)) {
+                        QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+                        assert(false);
+                    }
+                } else {
+                    break;
+                }
+            }
+            m = messages->next(c);
+        }
+    }
+    for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) {
+        if (f) f(*i);//ERROR? need to clear old persistent context?
+        if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing
+    }
+    return removed.size();
+}
+
 
 /**
  * purge - for purging all or some messages on a queue
  *         depending on the purge_request
  *
- * purge_request == 0 then purge all messages
- *               == N then purge N messages from queue
- * Sometimes purge_request == 1 to unblock the top of queue
+ * qty == 0 then purge all messages
+ *     == N then purge N messages from queue
+ * Sometimes qty == 1 to unblock the top of queue
  *
  * The dest exchange may be supplied to re-route messages through the exchange.
  * It is safe to re-route messages such that they arrive back on the same queue,
@@ -768,172 +693,53 @@ namespace {
  * An optional filter can be supplied that will be applied against each message.  The
  * message is purged only if the filter matches.  See MessageDistributor for more detail.
  */
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
                       const qpid::types::Variant::Map *filter)
 {
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
-    Collector c(*mf.get(), purge_request);
+    uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
 
-    {
-        Mutex::ScopedLock locker(messageLock);
-        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
-    }
-
-    if (!c.matches.empty()) {
-        if (mgmtObject) {
-            mgmtObject->inc_acquires(c.matches.size());
-            if (dest.get()) {
-                mgmtObject->inc_reroutes(c.matches.size());
-                if (brokerMgmtObject) {
-                    brokerMgmtObject->inc_acquires(c.matches.size());
-                    brokerMgmtObject->inc_reroutes(c.matches.size());
-                }
-            } else {
-                mgmtObject->inc_discardsPurge(c.matches.size());
-                if (brokerMgmtObject) {
-                    brokerMgmtObject->inc_acquires(c.matches.size());
-                    brokerMgmtObject->inc_discardsPurge(c.matches.size());
-                }
-            }
-        }
-
-        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-             qmsg != c.matches.end(); ++qmsg) {
-
-            {
-                // KAG: should be safe to retake lock after the removeIf, since
-                // no other thread can touch these messages after the removeIf call
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*qmsg, locker);
+    if (mgmtObject && count) {
+        mgmtObject->inc_acquires(count);
+        if (dest.get()) {
+            mgmtObject->inc_reroutes(count);
+            if (brokerMgmtObject) {
+                brokerMgmtObject->inc_acquires(count);
+                brokerMgmtObject->inc_reroutes(count);
             }
-            dequeue(0, *qmsg);
-            QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
-            // now reroute if necessary
-            if (dest.get()) {
-                assert(qmsg->payload);
-                qmsg->payload->clearTrace();
-                DeliverableMessage dmsg(qmsg->payload);
-                dest->routeWithAlternate(dmsg);
+        } else {
+            mgmtObject->inc_discardsPurge(count);
+            if (brokerMgmtObject) {
+                brokerMgmtObject->inc_acquires(count);
+                brokerMgmtObject->inc_discardsPurge(count);
             }
         }
     }
-    return c.matches.size();
+
+    return count;
 }
 
 uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
                      const qpid::types::Variant::Map *filter)
 {
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
-    Collector c(*mf.get(), qty);
-
-    {
-        Mutex::ScopedLock locker(messageLock);
-        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
-    }
-
-
-    if (!c.matches.empty()) {
-        // Update observers and message state:
-
-        if (mgmtObject) {
-            mgmtObject->inc_acquires(c.matches.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_acquires(c.matches.size());
-        }
-
-        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-             qmsg != c.matches.end(); ++qmsg) {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*qmsg, locker);
-            }
-            dequeue(0, *qmsg);
-            // and move to destination Queue.
-            assert(qmsg->payload);
-            destq->deliver(qmsg->payload);
-        }
-    }
-    return c.matches.size();
+    return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
 }
 
-/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
+void Queue::push(Message& message, bool /*isRecovery*/)
 {
-    bool ok;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        ok = messages->acquire(position, msg);
-        if (ok) observeAcquire(msg, locker);
-    }
-    if (ok) {
-        if (mgmtObject) {
-            mgmtObject->inc_acquires();
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_acquires();
-        }
-        ++dequeueSincePurge;
-        return true;
-    }
-    return false;
-}
-
-void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    QueuedMessage removed, qm(this, msg);
-    bool dequeueRequired = false;
     {
         Mutex::ScopedLock locker(messageLock);
-        qm.position = ++sequence;
-        if (messages->push(qm, removed)) {
-            dequeueRequired = true;
-            observeAcquire(removed, locker);
-        }
-        observeEnqueue(qm, locker);
-        if (policy.get()) {
-            policy->enqueued(qm);
-        }
+        message.setSequence(++sequence);
+        messages->publish(message);
         listeners.populate(copy);
-    }
-    if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
-
-    mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
-
-    if (dequeueRequired) {
-        if (mgmtObject) {
-            mgmtObject->inc_acquires();
-            mgmtObject->inc_discardsLvq();
-            if (brokerMgmtObject) {
-                brokerMgmtObject->inc_acquires();
-                brokerMgmtObject->inc_discardsLvq();
-            }
-        }
-        if (isRecovery) {
-            //can't issue new requests for the store until
-            //recovery is complete
-            Mutex::ScopedLock locker(messageLock);
-            pendingDequeues.push_back(removed);
-        } else {
-            dequeue(0, removed);
-        }
+        observeEnqueue(message, locker);
     }
     copy.notify();
 }
 
-void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
-{
-    if (message.payload->isIngressComplete()) (*result)++;
-}
-
-/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getEnqueueCompleteMessageCount() const
-{
-    uint32_t count = 0;
-    Mutex::ScopedLock locker(messageLock);
-    messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
-    return count;
-}
-
 uint32_t Queue::getMessageCount() const
 {
     Mutex::ScopedLock locker(messageLock);
@@ -949,7 +755,7 @@ uint32_t Queue::getConsumerCount() const
 bool Queue::canAutoDelete() const
 {
     Mutex::ScopedLock locker(messageLock);
-    return autodelete && !consumerCount && !owner;
+    return settings.autodelete && !consumerCount && !owner;
 }
 
 void Queue::clearLastNodeFailure()
@@ -957,14 +763,9 @@ void Queue::clearLastNodeFailure()
     inLastNodeFailure = false;
 }
 
-void Queue::forcePersistent(QueuedMessage& message)
+void Queue::forcePersistent(const Message& /*message*/)
 {
-    if(!message.payload->isStoredOnQueue(shared_from_this())) {
-        message.payload->forcePersistent();
-        if (message.payload->isForcedPersistent() ){
-            enqueue(0, message.payload);
-        }
-    }
+    //TODO
 }
 
 void Queue::setLastNodeFailure()
@@ -982,153 +783,129 @@ void Queue::setLastNodeFailure()
 }
 
 
-// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
+/*
+ * return true if enqueue succeeded and message should be made
+ * available; returning false will result in the message being dropped
+ */
+bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
 
-    if (policy.get() && !suppressPolicyCheck) {
-        std::deque<QueuedMessage> dequeues;
-        {
-            Mutex::ScopedLock locker(messageLock);
-            try {
-                policy->tryEnqueue(msg);
-            } catch(ResourceLimitExceededException&) {
-                if (mgmtObject) {
-                    mgmtObject->inc_discardsOverflow();
-                    if (brokerMgmtObject)
-                        brokerMgmtObject->inc_discardsOverflow();
-                }
-                throw;
-            }
-            policy->getPendingDequeues(dequeues);
-        }
-        //depending on policy, may have some dequeues that need to performed without holding the lock
-
-        //
-        // Count the dequeues as ring-discards.  We know that these aren't rejects because
-        // policy->tryEnqueue would have thrown an exception.
-        //
-        if (mgmtObject && !dequeues.empty()) {
-            mgmtObject->inc_discardsRing(dequeues.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsRing(dequeues.size());
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+            return false;
         }
-
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 
     if (inLastNodeFailure && persistLastNode){
-        msg->forcePersistent();
+        forcePersistent(msg);
     }
 
-    if (traceId.size()) {
-        msg->addTraceId(traceId);
+    if (settings.traceId.size()) {
+        msg.addTraceId(settings.traceId);
     }
 
-    if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
+    if (msg.isPersistent() && store) {
         // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
         // when it considers the message stored.
-        msg->enqueueAsync(shared_from_this(), store);
-        boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+        boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
+        assert(pmsg);
+        pmsg->enqueueAsync(shared_from_this(), store);
         store->enqueue(ctxt, pmsg, *this);
-        return true;
     }
-    if (!store) {
-        //Messages enqueued on a transient queue should be prevented
-        //from having their content released as it may not be
-        //recoverable by these queue for delivery
-        msg->blockContentRelease();
-    }
-    return false;
+    return true;
 }
 
-void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+void Queue::enqueueAborted(const Message& msg)
 {
+    //Called when any transactional enqueue is aborted (including but
+    //not limited to a recovered dtx transaction)
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);
+    current -= QueueDepth(1, msg.getContentSize());
 }
 
-// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+void Queue::enqueueCommited(Message& msg)
 {
-    ScopedUse u(barrier);
-    if (!u.acquired) return false;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        if (!isEnqueued(msg)) return false;
-        if (!ctxt) {
-            if (policy.get()) policy->dequeued(msg);
-            messages->deleted(msg);
-            observeDequeue(msg, locker);
-        }
+    //called when a recovered dtx enqueue operation is committed; the
+    //message is already on disk and space has been reserved in policy
+    //but it should now be made available
+    process(msg);
+}
+void Queue::dequeueAborted(Message& msg)
+{
+    //called when a recovered dtx dequeue operation is aborted; the
+    //message should be added back to the queue
+    push(msg);
+}
+void Queue::dequeueCommited(const Message& msg)
+{
+    //called when a recovered dtx dequeue operation is committed; the
+    //message will at this point have already been removed from the
+    //store and will not be available for delivery. The only action
+    //required is to ensure the observers are notified and the
+    //management stats are correctly decremented
+    Mutex::ScopedLock locker(messageLock);
+    observeDequeue(msg, locker);
+    if (mgmtObject != 0) {
+        mgmtObject->inc_msgTxnDequeues();
+        mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
     }
+}
 
-    if (!ctxt) {
-        mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
-    }
-
-    // This check prevents messages which have been forced persistent on one queue from dequeuing
-    // from another on which no forcing has taken place and thus causing a store error.
-    bool fp = msg.payload->isForcedPersistent();
-    if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) {
-        if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
-            msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-            boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
-            store->dequeue(ctxt, pmsg, *this);
-            return true;
-        }
+
+void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
+{
+    ScopedUse u(barrier);
+    if (u.acquired && msg && store) {
+        store->dequeue(0, msg, *this);
     }
-    return false;
 }
 
-void Queue::dequeueCommitted(const QueuedMessage& msg)
+void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
 {
+    ScopedUse u(barrier);
+    if (!u.acquired) return;
+    boost::intrusive_ptr<PersistableMessage> pmsg;
     {
         Mutex::ScopedLock locker(messageLock);
-        if (policy.get()) policy->dequeued(msg);
-        messages->deleted(msg);
-        observeDequeue(msg, locker);
+        Message* msg = messages->find(cursor);
+        if (msg) {
+            if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+            if (!ctxt) {
+                observeDequeue(*msg, locker);
+                messages->deleted(cursor);//message pointer not valid after this
+            }
+        } else {
+            return;
+        }
     }
-    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
-    if (mgmtObject != 0) {
-        _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
-        const uint64_t contentSize = msg.payload->contentSize();
-        qStats->msgTxnDequeues  += 1;
-        qStats->byteTxnDequeues += contentSize;
-        mgmtObject->statisticsUpdated();
+    if (store && pmsg) {
+        store->dequeue(ctxt, pmsg, *this);
+    }
+}
+
+void Queue::dequeueCommitted(const QueueCursor& cursor)
+{
+    Mutex::ScopedLock locker(messageLock);
+    Message* msg = messages->find(cursor);
+    if (msg) {
+        const uint64_t contentSize = msg->getContentSize();
+        observeDequeue(*msg, locker);
+        if (mgmtObject != 0) {
+            mgmtObject->inc_msgTxnDequeues();
+            mgmtObject->inc_byteTxnDequeues(contentSize);
+        }
         if (brokerMgmtObject) {
             _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
             bStats->msgTxnDequeues += 1;
             bStats->byteTxnDequeues += contentSize;
             brokerMgmtObject->statisticsUpdated();
         }
-    }
-}
-
-/**
- * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
- * it from the logical (and persistent if applicable) queue
- */
-bool Queue::popAndDequeue(QueuedMessage& msg)
-{
-    bool popped;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        popped = messages->consume(msg);
-        if (popped) observeAcquire(msg, locker);
-    }
-    if (popped) {
-        if (mgmtObject) {
-            mgmtObject->inc_acquires();
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_acquires();
-        }
-        dequeue(0, msg);
-        return true;
+        messages->deleted(cursor);
     } else {
-        return false;
+        QPID_LOG(error, "Could not find dequeued message on commit");
     }
 }
 
@@ -1136,8 +913,10 @@ bool Queue::popAndDequeue(QueuedMessage&
  * Updates policy and management when a message has been dequeued,
  * Requires messageLock be held by caller.
  */
-void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
 {
+    current -= QueueDepth(1, msg.getContentSize());
+    mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->dequeued(msg);
@@ -1150,7 +929,7 @@ void Queue::observeDequeue(const QueuedM
 /** updates queue observers when a message has become unavailable for transfer.
  * Requires messageLock be held by caller.
  */
-void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1164,7 +943,7 @@ void Queue::observeAcquire(const QueuedM
 /** updates queue observers when a message has become re-available for transfer
  *  Requires messageLock be held by caller.
  */
-void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1202,13 +981,11 @@ void Queue::observeConsumerRemove( const
 }
 
 
-void Queue::create(const FieldTable& _settings)
+void Queue::create()
 {
-    settings = _settings;
     if (store) {
-        store->create(*this, _settings);
+        store->create(*this, settings.storeSettings);
     }
-    configureImpl(_settings);
 }
 
 
@@ -1258,112 +1035,21 @@ bool getBoolSetting(const qpid::framing:
     }
 }
 
-void Queue::configure(const FieldTable& _settings)
+void Queue::abandoned(const Message& message)
 {
-    settings = _settings;
-    configureImpl(settings);
-}
-
-void Queue::configureImpl(const FieldTable& _settings)
-{
-    eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-    if (eventMode && broker) {
-        broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
-    }
-
-    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
-        (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
-        if ( NullMessageStore::isNullStore(store)) {
-            QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
-        } else if (broker && !(broker->getQueueEvents().isSync()) ) {
-            QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
-        }
-        FieldTable copy(_settings);
-        copy.erase(QueuePolicy::typeKey);
-        setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
-    } else {
-        setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
-    }
-    if (broker && broker->getManagementAgent()) {
-        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
-    }
-
-    //set this regardless of owner to allow use of no-local with exclusive consumers also
-    noLocal = getBoolSetting(_settings, qpidNoLocal);
-    QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
-
-    std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
-    if (lvqKey.size()) {
-        QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
-        messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-    } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
-        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
-        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-    } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
-        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
-        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
-        allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-    } else {
-        std::auto_ptr<Messages> m = Fairshare::create(_settings);
-        if (m.get()) {
-            messages = m;
-            allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
-            QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
-        } else { // default (FIFO) queue type
-            // override default message allocator if message groups configured.
-            boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
-            if (mgm) {
-                allocator = mgm;
-                addObserver(mgm);
-            }
-        }
-    }
-
-    persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
-    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
-
-    traceId = _settings.getAsString(qpidTraceIdentity);
-    std::string excludeList = _settings.getAsString(qpidTraceExclude);
-    if (excludeList.size()) {
-        split(traceExclude, excludeList, ", ");
-    }
-    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
-             << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
-
-    FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
-    if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
-
-    autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
-    if (autoDeleteTimeout)
-        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
-
-    if (mgmtObject != 0) {
-        mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
-    }
-
-    QueueFlowLimit::observe(*this, _settings);
+    if (reroute(alternateExchange, message) && brokerMgmtObject)
+        brokerMgmtObject->inc_abandonedViaAlt();
+    else if (brokerMgmtObject)
+        brokerMgmtObject->inc_abandoned();
 }
 
 void Queue::destroyed()
 {
     unbind(broker->getExchanges());
-
-    QueuedMessage m;
-    while(popAndDequeue(m)) {
-        DeliverableMessage msg(m.payload);
-        if (alternateExchange.get()) {
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_abandonedViaAlt();
-            alternateExchange->routeWithAlternate(msg);
-        } else {
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_abandoned();
-        }
-    }
-    if (alternateExchange.get())
+    remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
+    if (alternateExchange.get()) {
         alternateExchange->decAlternateUsers();
+    }
 
     if (store) {
         barrier.destroy();
@@ -1401,20 +1087,6 @@ void Queue::unbind(ExchangeRegistry& exc
     bindings.unbind(exchanges, shared_from_this());
 }
 
-void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
-{
-    Mutex::ScopedLock locker(messageLock);
-    policy = _policy;
-    if (policy.get())
-        policy->setQueue(this);
-}
-
-const QueuePolicy* Queue::getPolicy()
-{
-    Mutex::ScopedLock locker(messageLock);
-    return policy.get();
-}
-
 uint64_t Queue::getPersistenceId() const
 {
     return persistenceId;
@@ -1434,10 +1106,7 @@ void Queue::setPersistenceId(uint64_t _p
 void Queue::encode(Buffer& buffer) const
 {
     buffer.putShortString(name);
-    buffer.put(settings);
-    if (policy.get()) {
-        buffer.put(*policy);
-    }
+    buffer.put(encodableSettings);
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
 }
 
@@ -1445,21 +1114,19 @@ uint32_t Queue::encodedSize() const
 {
     return name.size() + 1/*short string size octet*/
         + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
-        + settings.encodedSize()
-        + (policy.get() ? (*policy).encodedSize() : 0);
+        + encodableSettings.encodedSize();
 }
 
 Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
 {
     string name;
     buffer.getShortString(name);
-    FieldTable settings;
-    buffer.get(settings);
+    FieldTable ft;
+    buffer.get(ft);
     boost::shared_ptr<Exchange> alternate;
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
-    if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
-        buffer.get ( *(result.first->policy) );
-    }
+    QueueSettings settings(true, false);
+    settings.populate(ft, settings.storeSettings);
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true);
     if (buffer.available()) {
         string altExch;
         buffer.getShortString(altExch);
@@ -1523,8 +1190,8 @@ struct AutoDeleteTask : qpid::sys::Timer
 
 void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
 {
-    if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
-        AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+    if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
+        AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
         broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
@@ -1543,12 +1210,15 @@ void Queue::releaseExclusiveOwnership()
 {
     Mutex::ScopedLock locker(ownershipLock);
     owner = 0;
+    if (mgmtObject) {
+        mgmtObject->set_exclusive(false);
+    }
 }
 
 bool Queue::setExclusiveOwner(const OwnershipToken* const o)
 {
     //reset auto deletion timer if necessary
-    if (autoDeleteTimeout && autoDeleteTask) {
+    if (settings.autoDeleteDelay && autoDeleteTask) {
         autoDeleteTask->cancel();
     }
     Mutex::ScopedLock locker(ownershipLock);
@@ -1556,6 +1226,9 @@ bool Queue::setExclusiveOwner(const Owne
         return false;
     } else {
         owner = o;
+        if (mgmtObject) {
+            mgmtObject->set_exclusive(true);
+        }
         return true;
     }
 }
@@ -1687,7 +1360,7 @@ namespace {
 struct After {
     framing::SequenceNumber seq;
     After(framing::SequenceNumber s) : seq(s) {}
-    bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+    bool operator()(const Message& m) { return m.getSequence() > seq; }
 };
 } // namespace
 
@@ -1695,12 +1368,10 @@ struct After {
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
     if (n < sequence) {
-        std::deque<QueuedMessage> dequeued;
-        dequeueIf(After(n), dequeued);
-        messages->setPosition(n);
+        remove(0, After(n), MessagePredicate(), BROWSER);
     }
     sequence = n;
-    QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
+    QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
 }
 
 SequenceNumber Queue::getPosition() {
@@ -1721,25 +1392,16 @@ void Queue::recoveryComplete(ExchangeReg
                       << "\": exchange does not exist.");
     }
     //process any pending dequeues
-    std::deque<QueuedMessage> pd;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        pendingDequeues.swap(pd);
+    for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) {
+        dequeueFromStore(i->getPersistentContext());
     }
-    for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-}
-
-void Queue::insertSequenceNumbers(const std::string& key)
-{
-    seqNoKey = key;
-    insertSeqNo = !seqNoKey.empty();
-    QPID_LOG(debug, "Inserting sequence numbers as " << key);
+    pendingDequeues.clear();
 }
 
 /** updates queue observers and state when a message has become available for transfer
  *  Requires messageLock be held by caller.
  */
-void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1748,32 +1410,7 @@ void Queue::observeEnqueue(const QueuedM
             QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
         }
     }
-}
-
-void Queue::updateEnqueued(const QueuedMessage& m)
-{
-    if (m.payload) {
-        boost::intrusive_ptr<Message> payload = m.payload;
-        enqueue(0, payload, true);
-        {
-            Mutex::ScopedLock locker(messageLock);
-            messages->updateAcquired(m);
-            observeEnqueue(m, locker);
-            if (policy.get()) {
-                policy->recoverEnqueued(payload);
-                policy->enqueued(m);
-            }
-        }
-        mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
-    } else {
-        QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
-    }
-}
-
-bool Queue::isEnqueued(const QueuedMessage& msg)
-{
-    Mutex::ScopedLock locker(messageLock);
-    return !policy.get() || policy->isEnqueued(msg);
+    mgntEnqStats(m, mgmtObject, brokerMgmtObject);
 }
 
 // Note: accessing listeners outside of lock is dangerous.  Caller must ensure the queue's
@@ -1835,28 +1472,82 @@ void Queue::setDequeueSincePurge(uint32_
     dequeueSincePurge = value;
 }
 
-namespace{
-class FindLowest
+void Queue::reject(const QueueCursor& cursor)
 {
-  public:
-    FindLowest() : init(false) {}
-    void process(const QueuedMessage& message) {
-        QPID_LOG(debug, "FindLowest processing: " << message.position);
-        if (!init || message.position < lowest) lowest = message.position;
-        init = true;
-    }
-    bool getLowest(qpid::framing::SequenceNumber& result) {
-        if (init) {
-            result = lowest;
-            return true;
+    Exchange::shared_ptr alternate = getAlternateExchange();
+    Message copy;
+    boost::intrusive_ptr<PersistableMessage> pmsg;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        Message* message = messages->find(cursor);
+        if (message) {
+            if (alternate) copy = *message;
+            if (message->isPersistent()) pmsg = message->getPersistentContext();
+            countRejected();
+            observeDequeue(*message, locker);
+            messages->deleted(cursor);
         } else {
-            return false;
+            return;
         }
     }
-  private:
-    bool init;
-    qpid::framing::SequenceNumber lowest;
-};
+    if (alternate) {
+        copy.resetDeliveryCount();
+        DeliverableMessage delivery(copy, 0);
+        alternate->routeWithAlternate(delivery);
+        QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+                 << alternate->getName());
+    } else {
+        //just drop it
+        QPID_LOG(info, "Dropping rejected message from " << getName());
+    }
+    dequeueFromStore(pmsg);
+}
+
+bool Queue::checkDepth(const QueueDepth& increment, const Message&)
+{
+    if (current && (settings.maxDepth - current < increment)) {
+        if (mgmtObject) {
+            mgmtObject->inc_discardsOverflow();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_discardsOverflow();
+        }
+        throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
+    } else {
+        current += increment;
+        return true;
+    }
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate)
+{
+    Mutex::ScopedLock locker(messageLock);
+    //hold lock across calls to predicate, or take copy of message?
+    //currently hold lock, may want to revise depending on any new use
+    //cases
+    Message* message = messages->next(cursor);
+    while (message && (predicate && !predicate(*message))) {
+        message = messages->next(cursor);
+    }
+    return message != 0;
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start)
+{
+    Mutex::ScopedLock locker(messageLock);
+    //hold lock across calls to predicate, or take copy of message?
+    //currently hold lock, may want to revise depending on any new use
+    //cases
+    Message* message;
+    message = messages->find(start, &cursor);
+    if (message && (!predicate || predicate(*message))) return true;
+
+    return seek(cursor, predicate);
+}
+
+bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start)
+{
+    Mutex::ScopedLock locker(messageLock);
+    return messages->find(start, &cursor);
 }
 
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Aug 10 12:04:27 2012
@@ -28,12 +28,14 @@
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Messages.h"
 #include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
 #include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/TxOp.h"
 
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Timer.h"
@@ -56,10 +58,14 @@
 namespace qpid {
 namespace broker {
 class Broker;
+class Exchange;
 class MessageStore;
+class QueueDepth;
 class QueueEvents;
 class QueueRegistry;
+class QueueFactory;
 class TransactionContext;
+class TxBuffer;
 class MessageDistributor;
 
 /**
@@ -70,7 +76,9 @@ class MessageDistributor;
  */
 class Queue : public boost::enable_shared_from_this<Queue>,
               public PersistableQueue, public management::Manageable {
-
+  public:
+    typedef boost::function1<bool, const Message&> MessagePredicate;
+  protected:
     struct UsageBarrier
     {
         Queue& parent;
@@ -90,31 +98,40 @@ class Queue : public boost::enable_share
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
 
+    class TxPublish : public TxOp
+    {
+        Message message;
+        boost::shared_ptr<Queue> queue;
+        bool prepared;
+      public:
+        TxPublish(const Message&,boost::shared_ptr<Queue>);
+        bool prepare(TransactionContext* ctxt) throw();
+        void commit() throw();
+        void rollback() throw();
+    };
+
     typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+    typedef boost::function1<void, Message&> MessageFunctor;
 
     const std::string name;
-    const bool autodelete;
     MessageStore* store;
     const OwnershipToken* owner;
     uint32_t consumerCount;     // Actually a count of all subscriptions, acquiring or not.
     uint32_t browserCount;      // Count of non-acquiring subscriptions.
     OwnershipToken* exclusive;
-    bool noLocal;
     bool persistLastNode;
     bool inLastNodeFailure;
-    std::string traceId;
     std::vector<std::string> traceExclude;
     QueueListeners listeners;
     std::auto_ptr<Messages> messages;
-    std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
+    std::vector<Message> pendingDequeues;
     /** messageLock is used to keep the Queue's state consistent while processing message
      * events, such as message dispatch, enqueue, acquire, and dequeue.  It must be held
      * while updating certain members in order to keep these members consistent with
      * each other:
      *     o  messages
      *     o  sequence
-     *     o  policy
      *     o  listeners
      *     o  allocator
      *     o  observeXXX() methods
@@ -127,9 +144,9 @@ class Queue : public boost::enable_share
     mutable qpid::sys::Monitor messageLock;
     mutable qpid::sys::Mutex ownershipLock;
     mutable uint64_t persistenceId;
-    framing::FieldTable settings;
-    std::auto_ptr<QueuePolicy> policy;
-    bool policyExceeded;
+    const QueueSettings settings;
+    qpid::framing::FieldTable encodableSettings;
+    QueueDepth current;
     QueueBindings bindings;
     std::string alternateExchangeName;
     boost::shared_ptr<Exchange> alternateExchange;
@@ -139,43 +156,42 @@ class Queue : public boost::enable_share
     sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
     int eventMode;
     Observers observers;
-    bool insertSeqNo;
     std::string seqNoKey;
     Broker* broker;
     bool deleted;
     UsageBarrier barrier;
-    int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     boost::shared_ptr<MessageDistributor> allocator;
 
-    void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
-    void setPolicy(std::auto_ptr<QueuePolicy> policy);
-    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
-    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
-    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
-    void notifyListener();
+    virtual void push(Message& msg, bool isRecovery=false);
+    void process(Message& msg);
+    bool enqueue(TransactionContext* ctxt, Message& msg);
+    bool getNextMessage(Message& msg, Consumer::shared_ptr& c);
 
     void removeListener(Consumer::shared_ptr);
 
-    bool isExcluded(boost::intrusive_ptr<Message>& msg);
+    bool isExcluded(const Message& msg);
 
-    /** update queue observers, stats, policy, etc when the messages' state changes.
-     * messageLock is held by caller */
-    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+     * must be held by caller */
+    void observeEnqueue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+    void observeAcquire(const Message& msg, const sys::Mutex::ScopedLock& lock);
+    void observeRequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+    void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
     void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
     void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
 
-    bool popAndDequeue(QueuedMessage&);
-    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
-    void forcePersistent(QueuedMessage& msg);
+    bool acquire(const qpid::framing::SequenceNumber& position, Message& msg,
+                 const qpid::sys::Mutex::ScopedLock& locker);
+
+    void forcePersistent(const Message& msg);
     int getEventMode();
-    void configureImpl(const qpid::framing::FieldTable& settings);
-    void checkNotDeleted(const Consumer::shared_ptr& c);
+    void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
+    void abandoned(const Message& message);
+    void checkNotDeleted(const Consumer::shared_ptr&);
     void notifyDeleted();
-    void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
+    uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
+    virtual bool checkDepth(const QueueDepth& increment, const Message&);
 
   public:
 
@@ -184,12 +200,11 @@ class Queue : public boost::enable_share
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
-                             bool autodelete = false,
+                             const QueueSettings& settings = QueueSettings(),
                              MessageStore* const store = 0,
-                             const OwnershipToken* const owner = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
-    QPID_BROKER_EXTERN ~Queue();
+    QPID_BROKER_EXTERN virtual ~Queue();
 
     /** allow the Consumer to consume or browse the next available message */
     QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
@@ -198,19 +213,13 @@ class Queue : public boost::enable_share
      * @param msg - message to be acquired.
      * @return false if message is no longer available for acquire.
      */
-    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+    QPID_BROKER_EXTERN bool acquire(const QueueCursor& msg, const std::string& consumer);
 
     /**
-     * Used to configure a new queue and create a persistent record
-     * for it in store if required.
+     * Used to create a persistent record for the queue in store if required.
      */
-    QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+    QPID_BROKER_EXTERN void create();
 
-    /**
-     * Used to reconfigure a recovered queue (does not create
-     * persistent record in store).
-     */
-    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
     void destroyed();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
@@ -224,34 +233,36 @@ class Queue : public boost::enable_share
         boost::shared_ptr<Exchange> exchange, const std::string& key,
         const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
 
-    /** Acquire the message at the given position if it is available for acquire.  Not to
-     * be used by clients, but used by the broker for queue management.
-     * @param message - set to the acquired message if true returned.
-     * @return true if the message has been acquired.
+    /**
+     * Removes (and dequeues) a message by its sequence number (used
+     * for some broker features, e.g. queue replication)
+     *
+     * @param position the sequence number of the message to be dequeued.
+     * @return true if the message is dequeued.
      */
-    QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
+    QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position);
 
     /**
      * Delivers a message to the queue. Will record it as
      * enqueued if persistent then process it.
      */
-    QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
-    /**
-     * Dispatches the messages immediately to a consumer if
-     * one is available or stores it for later if not.
-     */
-    QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
+    QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
     /**
      * Returns a message to the in-memory queue (due to lack
      * of acknowledegement from a receiver). If a consumer is
      * available it will be dispatched immediately, else it
      * will be returned to the front of the queue.
      */
-    QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
+    QPID_BROKER_EXTERN void release(const QueueCursor& msg, bool markRedelivered=true);
+    QPID_BROKER_EXTERN void reject(const QueueCursor& msg);
+
+    QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate);
+    QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate, qpid::framing::SequenceNumber start);
+    QPID_BROKER_EXTERN bool seek(QueueCursor&, qpid::framing::SequenceNumber start);
     /**
      * Used during recovery to add stored messages back to the queue
      */
-    QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
+    QPID_BROKER_EXTERN void recover(Message& msg);
 
     QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
                                     bool exclusive = false);
@@ -268,7 +279,6 @@ class Queue : public boost::enable_share
         const qpid::types::Variant::Map *filter=0);
 
     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
-    QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
     QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
     inline const std::string& getName() const { return name; }
     QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
@@ -277,8 +287,9 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
     QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
     inline bool isDurable() const { return store != 0; }
-    inline const framing::FieldTable& getSettings() const { return settings; }
-    inline bool isAutoDelete() const { return autodelete; }
+    inline const QueueSettings& getSettings() const { return settings; }
+    inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
+    inline bool isAutoDelete() const { return settings.autodelete; }
     QPID_BROKER_EXTERN bool canAutoDelete() const;
     const QueueBindings& getBindings() const { return bindings; }
 
@@ -288,48 +299,22 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN void setLastNodeFailure();
     QPID_BROKER_EXTERN void clearLastNodeFailure();
 
-    QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
-    QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
     /**
      * dequeue from store (only done once messages is acknowledged)
      */
-    QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+    QPID_BROKER_EXTERN void dequeue(TransactionContext* ctxt, const QueueCursor&);
     /**
      * Inform the queue that a previous transactional dequeue
      * committed.
      */
-    QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
-
-    /**
-     * Inform queue of messages that were enqueued, have since
-     * been acquired but not yet accepted or released (and
-     * thus are still logically on the queue) - used in
-     * clustered broker.
-     */
-    QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
-
-    /**
-     * Test whether the specified message (identified by its
-     * sequence/position), is still enqueued (note this
-     * doesn't mean it is available for delivery as it may
-     * have been delievered to a subscriber who has not yet
-     * accepted it).
-     */
-    QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
-
-    /**
-     * Acquires the next available (oldest) message
-     */
-    QPID_BROKER_EXTERN QueuedMessage get();
+    void dequeueCommitted(const QueueCursor& msg);
 
     /** Get the message at position pos, returns true if found and sets msg */
-    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
-
-    QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
+    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
 
     QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
     QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
-    QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
+    QPID_BROKER_EXTERN bool isLocal(const Message& msg);
 
     //PersistableQueue support:
     QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
@@ -410,7 +395,11 @@ class Queue : public boost::enable_share
      * Reserve space in policy for an enqueued message that
      * has been recovered in the prepared state (dtx only)
      */
-    QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
+    QPID_BROKER_EXTERN void recoverPrepared(const Message& msg);
+    void enqueueAborted(const Message& msg);
+    void enqueueCommited(Message& msg);
+    void dequeueAborted(Message& msg);
+    void dequeueCommited(const Message& msg);
 
     QPID_BROKER_EXTERN void flush();
 
@@ -418,6 +407,7 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+  friend class QueueFactory;
 };
 }
 }

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp Fri Aug 10 12:04:27 2012
@@ -18,21 +18,27 @@
  * under the License.
  *
  */
-#include "qpid/broker/ExpiryPolicy.h"
+#include "QueueCursor.h"
 #include "qpid/broker/Message.h"
-#include "qpid/sys/Time.h"
 
 namespace qpid {
 namespace broker {
+QueueCursor::QueueCursor(SubscriptionType t) : type(t), position(0), version(0), valid(false) {}
 
-ExpiryPolicy::~ExpiryPolicy() {}
-
-bool ExpiryPolicy::hasExpired(Message& m) {
-    return m.getExpiration() < sys::AbsTime::now();
+void QueueCursor::setPosition(int32_t p, int32_t v)
+{
+    position = p;
+    version = v;
+    valid = true;
 }
 
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
-    return sys::AbsTime::now();
+bool QueueCursor::check(const Message& m)
+{
+    return (m.getState() == AVAILABLE || ((type == REPLICATOR || type == PURGE) && m.getState() == ACQUIRED));
 }
 
+bool QueueCursor::isValid(int32_t v)
+{
+    return valid && (valid = (v == version));
+}
 }} // namespace qpid::broker

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h Fri Aug 10 12:04:27 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_EXPIRYPOLICY_H
-#define QPID_BROKER_EXPIRYPOLICY_H
+#ifndef QPID_BROKER_QUEUECURSOR_H
+#define QPID_BROKER_QUEUECURSOR_H
 
 /*
  *
@@ -21,30 +21,51 @@
  * under the License.
  *
  */
-
-#include "qpid/RefCounted.h"
 #include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
-
-namespace sys {
-class AbsTime;
-}
-
 namespace broker {
 
 class Message;
 
+enum SubscriptionType
+{
+    CONSUMER,
+    BROWSER,
+    PURGE,
+    REPLICATOR
+};
+
+class CursorContext {
+  public:
+    virtual ~CursorContext() {}
+};
 /**
- * Default expiry policy.
+ *
  */
-class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
+class QueueCursor
 {
   public:
-    QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
-    QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
-    QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
+    QPID_BROKER_EXTERN QueueCursor(SubscriptionType type = CONSUMER);
+
+  private:
+    SubscriptionType type;
+    int32_t position;
+    int32_t version;
+    bool valid;
+    boost::shared_ptr<CursorContext> context;
+
+    void setPosition(int32_t p, int32_t v);
+    bool check(const Message& m);
+    bool isValid(int32_t v);
+
+  friend class MessageDeque;
+  friend class MessageMap;
+  friend class PriorityQueue;
+  template <typename T> friend class IndexedDeque;
 };
 }} // namespace qpid::broker
 
-#endif  /*!QPID_BROKER_EXPIRYPOLICY_H*/
+#endif  /*!QPID_BROKER_QUEUECURSOR_H*/



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org