You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC

svn commit: r1377715 [5/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/ cpp/src/qpid/asyncStore/ cpp/src/qpid...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Aug 27 15:40:33 2012
@@ -20,7 +20,9 @@
  */
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -57,34 +59,6 @@ namespace {
                                                     << "=" << max));
         }
     }
-
-    /** extract a capacity value as passed in an argument map
-     */
-    uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
-    {
-        FieldTable::ValuePtr v = settings.get(key);
-
-        int64_t result = 0;
-
-        if (!v) return defaultValue;
-        if (v->getType() == 0x23) {
-            QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
-        } else if (v->getType() == 0x33) {
-            QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
-        } else if (v->convertsTo<int64_t>()) {
-            result = v->get<int64_t>();
-            QPID_LOG(debug, "Got integer value for " << key << ": " << result);
-            if (result >= 0) return result;
-        } else if (v->convertsTo<std::string>()) {
-            std::string s(v->get<std::string>());
-            QPID_LOG(debug, "Got string value for " << key << ": " << s);
-            std::istringstream convert(s);
-            if (convert >> result && result >= 0) return result;
-        }
-
-        QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
-        return defaultValue;
-    }
 }
 
 
@@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
 
     if (queue) {
         queueName = _queue->getName();
-        if (queue->getPolicy()) {
-            maxSize = _queue->getPolicy()->getMaxSize();
-            maxCount = _queue->getPolicy()->getMaxCount();
-        }
+        if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
+        if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
         broker = queue->getBroker();
         queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
         if (queueMgmtObj) {
@@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit()
     sys::Mutex::ScopedLock l(indexLock);
     if (!index.empty()) {
         // we're gone - release all pending msgs
-        for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+        for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
              itr != index.end(); ++itr)
             if (itr->second)
                 try {
-                    itr->second->getIngressCompletion().finishCompleter();
+                    itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
                 } catch (...) {}    // ignore - not safe for a destructor to throw.
         index.clear();
     }
 }
 
 
-void QueueFlowLimit::enqueued(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const Message& msg)
 {
     sys::Mutex::ScopedLock l(indexLock);
 
     ++count;
-    size += msg.payload->contentSize();
+    size += msg.getContentSize();
 
     if (!flowStopped) {
         if (flowStopCount && count > flowStopCount) {
@@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const Queu
     if (flowStopped || !index.empty()) {
         // ignore flow control if we are populating the queue due to cluster replication:
         if (broker && broker->isClusterUpdatee()) {
-            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
             return;
         }
-        QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
-        msg.payload->getIngressCompletion().startCompleter();    // don't complete until flow resumes
+        QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
+        msg.getPersistentContext()->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         bool unique;
-        unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+        unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second;
         // Like this to avoid tripping up unused variable warning when NDEBUG set
         if (!unique) assert(unique);
     }
@@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const Queu
 
 
 
-void QueueFlowLimit::dequeued(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const Message& msg)
 {
     sys::Mutex::ScopedLock l(indexLock);
 
@@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const Queu
         throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
     }
 
-    uint64_t _size = msg.payload->contentSize();
+    uint64_t _size = msg.getContentSize();
     if (_size <= size) {
         size -= _size;
     } else {
@@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const Queu
     if (!index.empty()) {
         if (!flowStopped) {
             // flow enabled - release all pending msgs
-            for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+            for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
                  itr != index.end(); ++itr)
                 if (itr->second)
-                    itr->second->getIngressCompletion().finishCompleter();
+                    itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
             index.clear();
         } else {
             // even if flow controlled, we must release this msg as it is being dequeued
-            std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+            std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence());
             if (itr != index.end()) {       // this msg is flow controlled, release it:
-                msg.payload->getIngressCompletion().finishCompleter();
+                msg.getPersistentContext()->getIngressCompletion().finishCompleter();
                 index.erase(itr);
             }
         }
@@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_
 }
 
 
-void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
 {
     QueueFlowLimit *ptr = createLimit( &queue, settings );
     if (ptr) {
@@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queu
 }
 
 /** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
 {
-    std::string type(QueuePolicy::getType(settings));
-
-    if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+    if (settings.dropMessagesAtLimit) {
         // The size of a RING queue is limited by design - no need for flow control.
         return 0;
     }
 
-    if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
-        settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+    if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
         // user provided (some) flow settings manually...
-        uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
-        uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
-        uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
-        uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
-        if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow control
+        if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
+            return new QueueFlowLimit(queue,
+                                      settings.flowStop.getCount(),
+                                      settings.flowResume.getCount(),
+                                      settings.flowStop.getSize(),
+                                      settings.flowResume.getSize());
+        } else {
+            //don't have a non-zero value for either the count or the
+            //size to stop at, yet at least one of these settings was
+            //provided, i.e it was set to 0 explicitly which we treat
+            //as turning it off
             return 0;
         }
-        return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
 
     if (defaultFlowStopRatio) {   // broker has a default ratio setup...
-        uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+        uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
         uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
         uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
-        uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0);  // no size by default
+        uint32_t maxMsgCount =  settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
         uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
         uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
         return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
     return 0;
@@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::fram
     framing::SequenceSet ss;
     if (!index.empty()) {
         /* replicate the set of messages pending flow control */
-        for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+        for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
              itr != index.end(); ++itr) {
             ss.add(itr->first);
         }
@@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid
             ++i;
             fcmsg.add(first, last);
             for (SequenceNumber seq = first; seq <= last; ++seq) {
-                QueuedMessage msg;
+                Message msg;
                 queue->find(seq, msg);   // fyi: may not be found if msg is acquired & unacked
                 bool unique;
-                unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+                unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
                 // Like this to avoid tripping up unused variable warning when NDEBUG set
                 if (!unique) assert(unique);
             }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h Mon Aug 27 15:40:33 2012
@@ -26,9 +26,9 @@
 #include <iostream>
 #include <memory>
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Mutex.h"
 
@@ -45,6 +45,8 @@ namespace qpid {
 namespace broker {
 
 class Broker;
+class Queue;
+struct QueueSettings;
 
 /**
  * Producer flow control: when level is > flowStop*, flow control is ON.
@@ -80,13 +82,13 @@ class Broker;
 
     QPID_BROKER_EXTERN virtual ~QueueFlowLimit();
 
-    /** the queue has added QueuedMessage.  Returns true if flow state changes */
-    QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
-    /** the queue has removed QueuedMessage.  Returns true if flow state changes */
-    QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+    /** the queue has added QueuedMessage */
+    QPID_BROKER_EXTERN void enqueued(const Message&);
+    /** the queue has removed QueuedMessage */
+    QPID_BROKER_EXTERN void dequeued(const Message&);
     /** ignored */
-    QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
-    QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
+    QPID_BROKER_EXTERN void acquired(const Message&) {};
+    QPID_BROKER_EXTERN void requeued(const Message&) {};
 
     /** for clustering: */
     QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
@@ -106,14 +108,14 @@ class Broker;
     void decode(framing::Buffer& buffer);
     uint32_t encodedSize() const;
 
-    static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
     static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
 
     friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
 
  protected:
     // msgs waiting for flow to become available.
-    std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
+    std::map<framing::SequenceNumber, Message > index;
     mutable qpid::sys::Mutex indexLock;
 
     _qmfBroker::Queue *queueMgmtObj;
@@ -123,7 +125,7 @@ class Broker;
     QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
                    uint32_t flowStopCount, uint32_t flowResumeCount,
                    uint64_t flowStopSize,  uint64_t flowResumeSize);
-    static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h Mon Aug 27 15:40:33 2012
@@ -24,8 +24,8 @@
 namespace qpid {
 namespace broker {
 
-struct QueuedMessage;
 class Consumer;
+class Message;
 
 /**
  * Interface for notifying classes who want to act as 'observers' of a queue of particular
@@ -63,10 +63,10 @@ class QueueObserver
     virtual ~QueueObserver() {}
 
     // note: the Queue will hold the messageLock while calling these methods!
-    virtual void enqueued(const QueuedMessage&) = 0;
-    virtual void dequeued(const QueuedMessage&) = 0;
-    virtual void acquired(const QueuedMessage&) = 0;
-    virtual void requeued(const QueuedMessage&) = 0;
+    virtual void enqueued(const Message&) = 0;
+    virtual void dequeued(const Message&) = 0;
+    virtual void acquired(const Message&) = 0;
+    virtual void requeued(const Message&) = 0;
     virtual void consumerAdded( const Consumer& ) {};
     virtual void consumerRemoved( const Consumer& ) {};
  private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp Mon Aug 27 15:40:33 2012
@@ -21,7 +21,6 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -32,50 +31,43 @@ using namespace qpid::broker;
 using namespace qpid::sys;
 using std::string;
 
-QueueRegistry::QueueRegistry(Broker* b) :
-    counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
+QueueRegistry::QueueRegistry(Broker* b)
+{
+    setBroker(b);
+}
 
 QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, 
-                       bool autoDelete, const OwnershipToken* owner,
+QueueRegistry::declare(const string& name, const QueueSettings& settings,
                        boost::shared_ptr<Exchange> alternate,
-                       const qpid::framing::FieldTable& arguments,
                        bool recovering/*true if this declare is a
                                         result of recovering queue
-                                        definition from persistente
+                                        definition from persistent
                                         record*/)
 {
-    Queue::shared_ptr queue;
     std::pair<Queue::shared_ptr, bool> result;
     {
         RWlock::ScopedWlock locker(lock);
-        string name = declareName.empty() ? generateName() : declareName;
-        assert(!name.empty());
         QueueMap::iterator i =  queues.find(name);
-
         if (i == queues.end()) {
-            queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+            Queue::shared_ptr queue = create(name, settings);
+            //Move this to factory also?
             if (alternate) {
                 queue->setAlternateExchange(alternate);//need to do this *before* create
                 alternate->incAlternateUsers();
             }
             if (!recovering) {
-                //apply settings & create persistent record if required
-                queue->create(arguments);
-            } else {
-                //i.e. recovering a queue for which we already have a persistent record
-                queue->configure(arguments);
+                //create persistent record if required
+                queue->create();
             }
             queues[name] = queue;
-            if (lastNode) queue->setLastNodeFailure();
             result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
             result = std::pair<Queue::shared_ptr, bool>(i->second, false);
         }
     }
-    if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+    if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
     return result;
 }
 
@@ -85,11 +77,11 @@ void QueueRegistry::destroy(const string
         qpid::sys::RWlock::ScopedWlock locker(lock);
         QueueMap::iterator i = queues.find(name);
         if (i != queues.end()) {
-            Queue::shared_ptr q = i->second;
+            q = i->second;
             queues.erase(i);
         }
     }
-    if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
+    if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){
@@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(con
     return q;
 }
 
-string QueueRegistry::generateName(){
-    string name;
-    do {
-        std::stringstream ss;
-        ss << "tmp_" << counter++;
-        name = ss.str();
-        // Thread safety: Private function, only called with lock held
-        // so this is OK.
-    } while(queues.find(name) != queues.end());
-    return name;
-}
-
 void QueueRegistry::setStore (MessageStore* _store)
 {
-    store = _store;
+    QueueFactory::setStore(_store);
 }
 
-MessageStore* QueueRegistry::getStore() const {
-    return store;
+MessageStore* QueueRegistry::getStore() const
+{
+    return QueueFactory::getStore();
 }
 
-void QueueRegistry::updateQueueClusterState(bool _lastNode)
+void QueueRegistry::setParent(qpid::management::Manageable* _parent)
 {
-    RWlock::ScopedRlock locker(lock);
-    for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
-        if (_lastNode){
-            i->second->setLastNodeFailure();
-        } else {
-            i->second->clearLastNodeFailure();
-        }
-    }
-    lastNode = _lastNode;
+    QueueFactory::setParent(_parent);
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,8 @@
 #define _QueueRegistry_
 
 #include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueFactory.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/management/Manageable.h"
 #include "qpid/framing/FieldTable.h"
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
@@ -34,11 +34,8 @@ namespace qpid {
 namespace broker {
 
 class Queue;
-class QueueEvents;
 class Exchange;
 class OwnershipToken;
-class Broker;
-class MessageStore;
 
 /**
  * A registry of queues indexed by queue name.
@@ -47,7 +44,7 @@ class MessageStore;
  * are deleted when and only when they are no longer in use.
  *
  */
-class QueueRegistry {
+class QueueRegistry : QueueFactory {
   public:
     QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
     QPID_BROKER_EXTERN ~QueueRegistry();
@@ -60,11 +57,8 @@ class QueueRegistry {
      */
     QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
         const std::string& name,
-        bool durable = false,
-        bool autodelete = false,
-        const OwnershipToken* owner = 0,
+        const QueueSettings& settings,
         boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
-        const qpid::framing::FieldTable& args = framing::FieldTable(),
         bool recovering = false);
 
     /**
@@ -101,11 +95,6 @@ class QueueRegistry {
     QPID_BROKER_EXTERN boost::shared_ptr<Queue> get(const std::string& name);
 
     /**
-     * Generate unique queue name.
-     */
-    std::string generateName();
-
-    /**
      * Set the store to use.  May only be called once.
      */
     void setStore (MessageStore*);
@@ -118,7 +107,7 @@ class QueueRegistry {
     /**
      * Register the manageable parent for declared queues
      */
-    void setParent (management::Manageable* _parent) { parent = _parent; }
+    void setParent (management::Manageable*);
 
     /** Call f for each queue in the registry. */
     template <class F> void eachQueue(F f) const {
@@ -127,22 +116,10 @@ class QueueRegistry {
             f(i->second);
     }
 
-	/**
-	* Change queue mode when cluster size drops to 1 node, expands again
-	* in practice allows flow queue to disk when last name to be exectuted
-	*/
-	void updateQueueClusterState(bool lastNode);
-
 private:
     typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
     QueueMap queues;
     mutable qpid::sys::RWlock lock;
-    int counter;
-    MessageStore* store;
-    QueueEvents* events;
-    management::Manageable* parent;
-    bool lastNode; //used to set mode on queue declare
-    Broker* broker;
 };
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,8 @@
 #define _QueuedMessage_
 
 #include "qpid/broker/Message.h"
-#include "BrokerImportExport.h"
-#include <iosfwd>
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/broker/BrokerImportExport.h"
 
 namespace qpid {
 namespace broker {
@@ -32,20 +32,19 @@ class Queue;
 
 struct QueuedMessage
 {
-    boost::intrusive_ptr<Message> payload;
+    Message message;
     framing::SequenceNumber position;
-    typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
-    Status status;
+    enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
     Queue* queue;
 
-    QueuedMessage(Queue* q=0,
-                  boost::intrusive_ptr<Message> msg=0,
-                  framing::SequenceNumber sn=0,
-                  Status st=AVAILABLE
-    ) :  payload(msg), position(sn), status(st), queue(q) {}
+    QueuedMessage() : queue(0) {}
+    QueuedMessage(Queue* q, Message msg, framing::SequenceNumber sn) :
+        message(msg), position(sn), queue(q) {}
+    QueuedMessage(Queue* q) : queue(q) {}
 };
 
-inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
+{
     return a.position < b.position;
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp Mon Aug 27 15:40:33 2012
@@ -22,10 +22,9 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveredDequeue.h"
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
 {
     queue->recoverPrepared(msg);
 }
@@ -38,11 +37,11 @@ bool RecoveredDequeue::prepare(Transacti
 
 void RecoveredDequeue::commit() throw()
 {
-    queue->enqueueAborted(msg);
+    queue->dequeueCommited(msg);
 }
 
 void RecoveredDequeue::rollback() throw()
 {
-    queue->process(msg);
+    queue->dequeueAborted(msg);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h Mon Aug 27 15:40:33 2012
@@ -26,8 +26,6 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/TxOp.h"
 
-#include <boost/intrusive_ptr.hpp>
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -36,18 +34,17 @@ namespace qpid {
     namespace broker {
         class RecoveredDequeue : public TxOp{
             boost::shared_ptr<Queue> queue;
-            boost::intrusive_ptr<Message> msg;
+            Message msg;
 
         public:
-            RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+            RecoveredDequeue(boost::shared_ptr<Queue> queue, Message msg);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~RecoveredDequeue(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
             boost::shared_ptr<Queue> getQueue() const { return queue; }
-            boost::intrusive_ptr<Message> getMessage() const { return msg; }
+            Message getMessage() const { return msg; }
         };
     }
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp Mon Aug 27 15:40:33 2012
@@ -22,10 +22,9 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
 {
     queue->recoverPrepared(msg);
 }
@@ -36,7 +35,7 @@ bool RecoveredEnqueue::prepare(Transacti
 }
 
 void RecoveredEnqueue::commit() throw(){
-    queue->process(msg);
+    queue->enqueueCommited(msg);
 }
 
 void RecoveredEnqueue::rollback() throw(){

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h Mon Aug 27 15:40:33 2012
@@ -26,8 +26,6 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/TxOp.h"
 
-#include <boost/intrusive_ptr.hpp>
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -36,19 +34,17 @@ namespace qpid {
 namespace broker {
 class RecoveredEnqueue : public TxOp{
     boost::shared_ptr<Queue> queue;
-    boost::intrusive_ptr<Message> msg;
+    Message msg;
 
   public:
-    RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+    RecoveredEnqueue(boost::shared_ptr<Queue> queue, Message msg);
     virtual bool prepare(TransactionContext* ctxt) throw();
     virtual void commit() throw();
     virtual void rollback() throw();
     virtual ~RecoveredEnqueue(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
     boost::shared_ptr<Queue> getQueue() const { return queue; }
-    boost::intrusive_ptr<Message> getMessage() const { return msg; }
-            
+    Message getMessage() const { return msg; }
 };
 }
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Aug 27 15:40:33 2012
@@ -21,11 +21,13 @@
 #include "qpid/broker/RecoveryManagerImpl.h"
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/reply_exceptions.h"
 
 using boost::dynamic_pointer_cast;
@@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImp
 
 class RecoverableMessageImpl : public RecoverableMessage
 {
-    intrusive_ptr<Message> msg;
+    Message msg;
 public:
-    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
+    RecoverableMessageImpl(const Message& _msg);
     ~RecoverableMessageImpl() {};
     void setPersistenceId(uint64_t id);
     void setRedelivered();
@@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryMan
 
 RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
 {
-    boost::intrusive_ptr<Message> message(new Message());
-    message->decodeHeader(buffer);
-    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
+    //TODO: determine encoding/version actually used
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+    transfer->decodeHeader(buffer);
+    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
 }
 
 RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
@@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComple
     exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
 }
 
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
-    if (!msg->isPersistent()) {
-        msg->forcePersistent(); // set so that message will get dequeued from store.
-    }
-}
+RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {}
 
 bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
 {
@@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent
 
 void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
 {
-    msg->decodeContent(buffer);
+    msg.getPersistentContext()->decodeContent(buffer);
 }
 
 void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
@@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Que
 
 void RecoverableMessageImpl::setPersistenceId(uint64_t id)
 {
-    msg->setPersistenceId(id);
+    msg.getPersistentContext()->setPersistenceId(id);
 }
 
 void RecoverableMessageImpl::setRedelivered()
 {
-    msg->redeliver();
+    msg.deliver();//increment delivery count (but at present that isn't recorded durably)
 }
 
 void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
@@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenc
 {
     queue->setPersistenceId(id);
 }
-       
+
 uint64_t RecoverableQueueImpl::getPersistenceId() const
 {
 	return queue->getPersistenceId();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp Mon Aug 27 15:40:33 2012
@@ -29,7 +29,7 @@
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/SequenceSet.h"
@@ -65,9 +65,8 @@ using qpid::management::Manageable;
 using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
+SemanticState::SemanticState(SessionState& ss)
     : session(ss),
-      deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
@@ -89,7 +88,7 @@ void SemanticState::closed() {
         if (dtxBuffer.get()) {
             dtxBuffer->fail();
         }
-        recover(true);
+        requeue();
 
         //now unsubscribe, which may trigger queue deletion and thus
         //needs to occur after the requeueing of unacked messages
@@ -124,7 +123,7 @@ void SemanticState::consume(const string
                          resumeId, resumeTtl, arguments);
     if (!c)                     // Create plain consumer
         c = ConsumerImpl::shared_ptr(
-            new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+            new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
                              resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
@@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           const string& _name,
                                           Queue::shared_ptr _queue,
                                           bool ack,
-                                          bool _acquire,
+                                          SubscriptionType type,
                                           bool _exclusive,
                                           const string& _tag,
                                           const string& _resumeId,
@@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           const framing::FieldTable& _arguments
 
 ) :
-    Consumer(_name, _acquire),
+Consumer(_name, type),
     parent(_parent),
     queue(_queue),
     ackExpected(ack),
-    acquire(_acquire),
+    acquire(type == CONSUMER),
     blocked(true),
     exclusive(_exclusive),
     resumeId(_resumeId),
@@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerI
     return &(parent->session);
 }
 
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+{
+    return deliver(cursor, msg, shared_from_this());
+}
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
 {
     assertClusterSafe();
-    allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
-                          shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
+    allocateCredit(msg);
+    DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+                          consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
-    parent->deliver(record, sync);
+    const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
+
+    record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+                                         ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
+                                         acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
+                                         msg.getAnnotations(),
+                                         sync));
     if (credit.isWindowMode() || ackExpected || !acquire) {
         parent->record(record);
     }
     if (acquire && !ackExpected) {  // auto acquire && auto accept
-        msg.queue->dequeue(0, msg);
+        queue->dequeue(0 /*ctxt*/, cursor);
         record.setEnded();
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
 }
 
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
+bool SemanticState::ConsumerImpl::filter(const Message&)
 {
     return true;
 }
 
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::accept(const Message& msg)
 {
     assertClusterSafe();
     // TODO aconway 2009-06-08: if we have byte & message credit but
@@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const Co
 }
 }
 
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
 {
     assertClusterSafe();
     Credit original = credit;
-    credit.consume(1, msg->getRequiredCredit());
+    credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << original << " now " << credit);
 
 }
 
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
 {
-    bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+    bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
-             <<  " credit for message of " << msg->getRequiredCredit() << " bytes: "
+             <<  " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
              << credit);
     return enoughCredit;
 }
@@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl
         session.getConnection().outputTasks.removeOutputTask(c.get());
 }
 
-
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
     disable(c);
@@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl:
     c->cancel();
 }
 
-void SemanticState::handle(intrusive_ptr<Message> msg) {
-    if (txBuffer.get()) {
-        TxPublish* deliverable(new TxPublish(msg));
-        TxOp::shared_ptr op(deliverable);
-        route(msg, *deliverable);
-        txBuffer->enlist(op);
-    } else {
-        DeliverableMessage deliverable(msg);
-        route(msg, deliverable);
-        if (msg->isContentReleaseRequested()) {
-            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
-            // presence of these messages). Do not change these without also checking these tests.
-            if (msg->isContentReleaseBlocked()) {
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
-            } else {
-                msg->releaseContent();
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content released");
-            }
-        }
-    }
-}
-
-namespace
+TxBuffer* SemanticState::getTxBuffer()
 {
-const std::string nullstring;
+    return txBuffer.get();
 }
 
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+void SemanticState::route(Message& msg, Deliverable& strategy) {
+    msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
 
-    std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName
-        || cacheExchange->isDestroyed())
-    {
+    std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
+    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
-    }
-    cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
-    std::string id =
-    	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+    std::string id = msg.getUserId();
     if (authMsg &&  !id.empty() && !session.getConnection().isAuthenticatedUser(id))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<
     AclModule* acl = getSession().getBroker().getAcl();
     if (acl && acl->doTransferAcl())
     {
-        if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
+        if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() ))
             throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
-                                               exchangeName << " with routing-key " << msg->getRoutingKey()));
+                                               exchangeName << " with routing-key " << msg.getRoutingKey()));
     }
 
     cacheExchange->route(strategy);
@@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy);
         }
-        if (!strategy.delivered) {
-            msg->destroy();
-        }
     }
 
 }
@@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::comple
     }
 }
 
-void SemanticState::recover(bool requeue)
+void SemanticState::requeue()
 {
-    if(requeue){
-        //take copy and clear unacked as requeue may result in redelivery to this session
-        //which will in turn result in additions to unacked
-        DeliveryRecords copy = unacked;
-        unacked.clear();
-        for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
-    }else{
-        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
-        //unconfirmed messages re redelivered and therefore have their
-        //id adjusted, confirmed messages are not and so the ordering
-        //w.r.t id is lost
-        sort(unacked.begin(), unacked.end());
-    }
+    //take copy and clear unacked as requeue may result in redelivery to this session
+    //which will in turn result in additions to unacked
+    DeliveryRecords copy = unacked;
+    unacked.clear();
+    for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     getSession().setUnackedCount(unacked.size());
 }
 
-void SemanticState::deliver(DeliveryRecord& msg, bool sync)
-{
-    return deliveryAdapter.deliver(msg, sync);
-}
+
+SessionContext& SemanticState::getSession() { return session; }
+const SessionContext& SemanticState::getSession() const { return session; }
+
 
 const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h Mon Aug 27 15:40:33 2012
@@ -26,7 +26,6 @@
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Credit.h"
 #include "qpid/broker/Deliverable.h"
-#include "qpid/broker/DeliveryAdapter.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/DtxManager.h"
@@ -34,12 +33,15 @@
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/TxBuffer.h"
 
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/AtomicValue.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Subscription.h"
 
 #include <list>
@@ -47,13 +49,15 @@
 #include <vector>
 
 #include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
 #include <boost/cast.hpp>
 
 namespace qpid {
 namespace broker {
 
+class Exchange;
+class MessageStore;
 class SessionContext;
+class SessionState;
 
 /**
  *
@@ -94,28 +98,28 @@ class SemanticState : private boost::non
         int deliveryCount;
         qmf::org::apache::qpid::broker::Subscription* mgmtObject;
 
-        bool checkCredit(boost::intrusive_ptr<Message>& msg);
-        void allocateCredit(boost::intrusive_ptr<Message>& msg);
+        bool checkCredit(const Message& msg);
+        void allocateCredit(const Message& msg);
         bool haveCredit();
 
       protected:
         QPID_BROKER_EXTERN virtual bool doDispatch();
         size_t unacked() { return parent->unacked.size(); }
+        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
 
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
 
-        QPID_BROKER_EXTERN ConsumerImpl(
-            SemanticState* parent,
-            const std::string& name, boost::shared_ptr<Queue> queue,
-            bool ack, bool acquire, bool exclusive,
-            const std::string& tag, const std::string& resumeId, uint64_t resumeTtl,
-            const framing::FieldTable& arguments);
-        QPID_BROKER_EXTERN virtual ~ConsumerImpl();
+        QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
+                     const std::string& name, boost::shared_ptr<Queue> queue,
+                     bool ack, SubscriptionType type, bool exclusive,
+                     const std::string& tag, const std::string& resumeId,
+                     uint64_t resumeTtl, const framing::FieldTable& arguments);
+        QPID_BROKER_EXTERN ~ConsumerImpl();
         QPID_BROKER_EXTERN OwnershipToken* getSession();
-        QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg);
-        QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg);
-        QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg);
+        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+        QPID_BROKER_EXTERN bool filter(const Message&);
+        QPID_BROKER_EXTERN bool accept(const Message&);
         QPID_BROKER_EXTERN void cancel() {}
 
         QPID_BROKER_EXTERN void disableNotify();
@@ -153,7 +157,7 @@ class SemanticState : private boost::non
         SemanticState& getParent() { return *parent; }
         const SemanticState& getParent() const { return *parent; }
 
-        void acknowledged(const broker::QueuedMessage&) {}
+        void acknowledged(const DeliveryRecord&) {}
 
         // manageable entry points
         QPID_BROKER_EXTERN management::ManagementObject*
@@ -168,8 +172,7 @@ class SemanticState : private boost::non
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
 
-    SessionContext& session;
-    DeliveryAdapter& deliveryAdapter;
+    SessionState& session;
     ConsumerImplMap consumers;
     NameGenerator tagGenerator;
     DeliveryRecords unacked;
@@ -185,7 +188,6 @@ class SemanticState : private boost::non
     //needed for queue delete events in auto-delete:
     const std::string connectionId;
 
-    void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void checkDtxTimeout();
 
     bool complete(DeliveryRecord&);
@@ -196,11 +198,11 @@ class SemanticState : private boost::non
 
   public:
 
-    SemanticState(DeliveryAdapter&, SessionContext&);
+    SemanticState(SessionState&);
     ~SemanticState();
 
-    SessionContext& getSession() { return session; }
-    const SessionContext& getSession() const { return session; }
+    SessionContext& getSession();
+    const SessionContext& getSession() const;
 
     const ConsumerImpl::shared_ptr find(const std::string& destination) const;
     bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
@@ -239,12 +241,12 @@ class SemanticState : private boost::non
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
-    void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);
+    TxBuffer* getTxBuffer();
+    void requeue();
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
-    void handle(boost::intrusive_ptr<Message> msg);
+    void route(Message& msg, Deliverable& strategy);
 
     void completed(const framing::SequenceSet& commands);
     void accepted(const framing::SequenceSet& commands);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp Mon Aug 27 15:40:33 2012
@@ -264,7 +264,7 @@ QueueQueryResult SessionAdapter::QueueHa
                                 queue->isDurable(),
                                 queue->hasExclusiveOwner(),
                                 queue->isAutoDelete(),
-                                queue->getSettings(),
+                                queue->getEncodableSettings(),
                                 queue->getMessageCount(),
                                 queue->getConsumerCount());
     } else {
@@ -294,19 +294,24 @@ void SessionAdapter::QueueHandlerImpl::d
         queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
+        QueueSettings settings(durable, autoDelete);
+        try {
+            settings.populate(arguments, settings.storeSettings);
+        } catch (const qpid::types::Exception& e) {
+            throw InvalidArgumentException(e.what());
+        }
+
         std::pair<Queue::shared_ptr, bool> queue_created =
-            getBroker().createQueue(name, durable,
-                                    autoDelete,
+            getBroker().createQueue(name, settings,
                                     exclusive ? &session : 0,
                                     alternateExchange,
-                                    arguments,
                                     getConnection().getUserId(),
                                     getConnection().getUrl());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
             //handle automatic cleanup:
-            if (exclusive) {
+            if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
         } else {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ using namespace std;
 using namespace qpid::sys;
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
-    : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+    : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c),
       proxy(out),
       clusterOrderProxy(c.getClusterOrderOutput() ?
@@ -75,7 +75,7 @@ ConnectionState& SessionHandler::getConn
 const ConnectionState& SessionHandler::getConnection() const { return connection; }
 
 void SessionHandler::handleDetach() {
-    amqp_0_10::SessionHandler::handleDetach();
+    qpid::amqp_0_10::SessionHandler::handleDetach();
     assert(&connection.getChannel(channel.get()) == this);
     if (session.get())
         connection.getBroker().getSessionManager().detach(session);
@@ -125,7 +125,7 @@ void SessionHandler::attached(const std:
 {
     if (session.get()) {
         session->addManagementObject(); // Delayed from attachAs()
-        amqp_0_10::SessionHandler::attached(name);
+        qpid::amqp_0_10::SessionHandler::attached(name);
     } else {
         SessionId id(connection.getUserId(), name);
         SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h Mon Aug 27 15:40:33 2012
@@ -41,7 +41,7 @@ class SessionState;
  * receives incoming frames, handles session controls and manages the
  * association between the channel and a session.
  */
-class SessionHandler : public amqp_0_10::SessionHandler {
+class SessionHandler : public qpid::amqp_0_10::SessionHandler {
   public:
     class ErrorListener {
       public:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp Mon Aug 27 15:40:33 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/SessionHandler.h"
@@ -28,6 +29,7 @@
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
@@ -55,9 +57,8 @@ SessionState::SessionState(
     const SessionState::Configuration& config, bool delayManagement)
     : qpid::SessionState(id, config),
       broker(b), handler(&h),
-      semanticState(*this, *this),
+      semanticState(*this),
       adapter(semanticState),
-      msgBuilder(&broker.getStore()),
       mgmtObject(0),
       asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
@@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFram
 {
     if (frame.getBof() && frame.getBos()) //start of frameset
         msgBuilder.start(id);
-    intrusive_ptr<Message> msg(msgBuilder.getMessage());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
     msgBuilder.handle(frame);
     if (frame.getEof() && frame.getEos()) {//end of frameset
         if (frame.getBof()) {
@@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFram
             header.setEof(false);
             msg->getFrames().append(header);
         }
+        DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
         if (broker.isTimestamping())
-            msg->setTimestamp();
-        msg->setPublisher(&getConnection());
+            deliverable.getMessage().setTimestamp();
+        deliverable.getMessage().setPublisher(&getConnection());
+
+
+        IncompleteIngressMsgXfer xfer(this, msg);
         msg->getIngressCompletion().begin();
-        semanticState.handle(msg);
+        semanticState.route(deliverable.getMessage(), deliverable);
         msgBuilder.end();
-        IncompleteIngressMsgXfer xfer(this, msg);
         msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
     }
 }
@@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& f
     handler->out(frame);
 }
 
-void SessionState::deliver(DeliveryRecord& msg, bool sync)
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+                                 const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+                                 qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+                                 const qpid::types::Variant::Map& annotations, bool sync)
 {
     uint32_t maxFrameSize = getConnection().getFrameMax();
     assert(senderGetCommandPoint().offset == 0);
     SequenceNumber commandId = senderGetCommandPoint().command;
-    msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
+
+    framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+    method.setEof(false);
+    getProxy().getHandler().handle(method);
+    message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+    message.sendContent(getProxy().getHandler(), maxFrameSize);
+
     assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
     if (sync) {
         AMQP_ClientProxy::Execution& p(getProxy().getExecution());
         Proxy::ScopedSync s(p);
         p.sync();
     }
+    return commandId;
 }
 
 void SessionState::sendCompletion() {
@@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSy
     }
 }
 
-
 /** factory for creating a reference-counted IncompleteIngressMsgXfer object
  * which will be attached to a message that will be completed asynchronously.
  */
@@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter
 
 
 /** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+    std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
     bool unique = pendingMsgs.insert(item).second;
     if (!unique) {
       assert(false);
@@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter
 /** done when an execution.sync arrives */
 void SessionState::AsyncCommandCompleter::flushPendingMessages()
 {
-    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+    std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
     {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
         pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed, so nuke 'em now.
     }
     // drop lock, so it is safe to call "flush()"
-    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+    for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
          i != copy.end(); ++i) {
         i->second->flush();
     }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h Mon Aug 27 15:40:33 2012
@@ -23,17 +23,18 @@
  */
 
 #include "qpid/SessionState.h"
+#include "qpid/framing/enum.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Time.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
-#include "qpid/broker/DeliveryAdapter.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
@@ -58,7 +59,6 @@ namespace broker {
 
 class Broker;
 class ConnectionState;
-class Message;
 class SessionHandler;
 class SessionManager;
 
@@ -68,7 +68,6 @@ class SessionManager;
  */
 class SessionState : public qpid::SessionState,
                      public SessionContext,
-                     public DeliveryAdapter,
                      public management::Manageable,
                      public framing::FrameHandler::InOutHandler
 {
@@ -105,8 +104,10 @@ class SessionState : public qpid::Sessio
 
     void sendCompletion();
 
-    //delivery adapter methods:
-    void deliver(DeliveryRecord&, bool sync);
+    DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+                       const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+                       qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
+                       const qpid::types::Variant::Map& annotations, bool sync);
 
     // Manageable entry points
     management::ManagementObject* GetManagementObject (void) const;
@@ -117,7 +118,7 @@ class SessionState : public qpid::Sessio
 
     // Used by cluster to create replica sessions.
     SemanticState& getSemanticState() { return semanticState; }
-    boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessageInProgress() { return msgBuilder.getMessage(); }
     SessionAdapter& getSessionAdapter() { return adapter; }
 
     const SessionId& getSessionId() const { return getId(); }
@@ -199,7 +200,7 @@ class SessionState : public qpid::Sessio
         // If an ingress message does not require a Sync, we need to
         // hold a reference to it in case an Execution.Sync command is received and we
         // have to manually flush the message.
-        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+        std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > pendingMsgs;
 
         /** complete all pending commands, runs in IO thread */
         void completeCommands();
@@ -212,7 +213,7 @@ class SessionState : public qpid::Sessio
         ~AsyncCommandCompleter() {};
 
         /** track a message pending ingress completion */
-        void addPendingMessage(boost::intrusive_ptr<Message> m);
+        void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
         void deletePendingMessage(SequenceNumber id);
         void flushPendingMessages();
         /** schedule the processing of a completed ingress message.transfer command */
@@ -246,29 +247,29 @@ class SessionState : public qpid::Sessio
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
-                                  boost::intrusive_ptr<Message> m )
+                                  boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
           : AsyncCommandContext(ss, m->getCommandId()),
-          session(ss),
-          msg(m),
-          requiresAccept(m->requiresAccept()),
-          requiresSync(m->getFrames().getMethod()->isSync()),
-          pending(false) {}
+            session(ss),
+            msg(m),
+            requiresAccept(m->requiresAccept()),
+            requiresSync(m->getFrames().getMethod()->isSync()),
+            pending(false) {}
         IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
           : AsyncCommandContext(x.session, x.msg->getCommandId()),
-          session(x.session),
-          msg(x.msg),
-          requiresAccept(x.requiresAccept),
-          requiresSync(x.requiresSync),
-          pending(x.pending) {}
+            session(x.session),
+            msg(x.msg),
+            requiresAccept(x.requiresAccept),
+            requiresSync(x.requiresSync),
+            pending(x.pending) {}
 
-  virtual ~IncompleteIngressMsgXfer() {};
+        virtual ~IncompleteIngressMsgXfer() {};
 
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
      private:
         SessionState *session;  // only valid if sync flag in callback is true
-        boost::intrusive_ptr<Message> msg;
+        boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg;
         bool requiresAccept;
         bool requiresSync;
         bool pending;   // true if msg saved on pending list...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp Mon Aug 27 15:40:33 2012
@@ -28,6 +28,17 @@
 namespace qpid  {
 namespace broker {
 
+SimpleMessage::SimpleMessage() {}
+
+SimpleMessage::SimpleMessage(const char* msgData,
+                             const uint32_t msgSize,
+                             boost::intrusive_ptr<PersistableMessage> persistentContext) :
+        m_msg(msgData, static_cast<size_t>(msgSize)),
+        m_persistentContext(persistentContext)
+{}
+
+
+/*
 SimpleMessage::SimpleMessage(const char* msgData,
                              const uint32_t msgSize) :
         m_persistenceId(0ULL),
@@ -44,24 +55,28 @@ SimpleMessage::SimpleMessage(const char*
         m_store(store),
         m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle())
 {}
+*/
 
 SimpleMessage::~SimpleMessage() {}
 
+/*
 const MessageHandle&
 SimpleMessage::getHandle() const {
-    return m_msgHandle;
+    return m_persistentContext.getHandle();
 }
 
 MessageHandle&
 SimpleMessage::getHandle() {
-    return m_msgHandle;
+    return m_persistentContext.getHandle();
 }
+*/
 
 uint64_t
 SimpleMessage::contentSize() const {
     return  static_cast<uint64_t>(m_msg.size());
 }
 
+/*
 void
 SimpleMessage::setPersistenceId(uint64_t id) const {
     m_persistenceId = id;
@@ -89,12 +104,18 @@ uint32_t
 SimpleMessage::encodedHeaderSize() const {
     return 0;
 }
-
+*/
 bool
 SimpleMessage::isPersistent() const {
-    return m_store != 0;
+    return m_persistentContext.get() != 0;
 }
 
+boost::intrusive_ptr<PersistableMessage>
+SimpleMessage::getPersistentContext() const {
+    return m_persistentContext;
+}
+
+
 uint64_t
 SimpleMessage::getSize() {
     return m_msg.size();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h Mon Aug 27 15:40:33 2012
@@ -25,47 +25,51 @@
 #define qpid_broker_SimpleMessage_h_
 
 #include "AsyncStore.h" // DataSource
-#include "MessageHandle.h"
+//#include "MessageHandle.h"
 #include "PersistableMessage.h"
 
 namespace qpid  {
 namespace broker {
 
-class SimpleMessage: public PersistableMessage,
-                     public DataSource
+class SimpleMessage: /*public PersistableMessage,*/
+                     public DataSource,
+                     public RefCounted
 {
 public:
-    SimpleMessage(const char* msgData,
-                  const uint32_t msgSize);
+    SimpleMessage();
     SimpleMessage(const char* msgData,
                   const uint32_t msgSize,
-                  AsyncStore* store);
+                  boost::intrusive_ptr<PersistableMessage> persistentContext);
     virtual ~SimpleMessage();
-    const MessageHandle& getHandle() const;
-    MessageHandle& getHandle();
+//    const MessageHandle& getHandle() const;
+//    MessageHandle& getHandle();
     uint64_t contentSize() const;
 
-    // --- Interface Persistable ---
-    virtual void setPersistenceId(uint64_t id) const;
-    virtual uint64_t getPersistenceId() const;
-    virtual void encode(qpid::framing::Buffer& buffer) const;
-    virtual uint32_t encodedSize() const;
-
-    // --- Interface PersistableMessage ---
-    virtual void allDequeuesComplete();
-    virtual uint32_t encodedHeaderSize() const;
-    virtual bool isPersistent() const;
+//    // --- Interface Persistable ---
+//    virtual void setPersistenceId(uint64_t id) const;
+//    virtual uint64_t getPersistenceId() const;
+//    virtual void encode(qpid::framing::Buffer& buffer) const;
+//    virtual uint32_t encodedSize() const;
+//
+//    // --- Interface PersistableMessage ---
+//    virtual void allDequeuesComplete();
+//    virtual uint32_t encodedHeaderSize() const;
+
+    // Persistent operations
+    bool isPersistent() const;
+    boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
 
     // --- Interface DataSource ---
     virtual uint64_t getSize(); // <- same as encodedSize()?
     virtual void write(char* target);
 
 private:
-    mutable uint64_t m_persistenceId;
+//    mutable uint64_t m_persistenceId;
     const std::string m_msg;
-    AsyncStore* m_store;
+    boost::intrusive_ptr<PersistableMessage> m_persistentContext;
+//    AsyncStore* m_store;
 
-    MessageHandle m_msgHandle;
+//    MessageHandle m_msgHandle;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp Mon Aug 27 15:40:33 2012
@@ -171,7 +171,7 @@ SimpleQueue::enqueue(SimpleTxnBuffer* tb
         return false;
     }
     if (qm->payload()->isPersistent() && m_store) {
-        qm->payload()->enqueueAsync(shared_from_this(), m_store);
+        qm->payload()->getPersistentContext()->enqueueAsync(shared_from_this(), m_store);
         return asyncEnqueue(tb, qm);
     }
     return false;
@@ -190,7 +190,7 @@ SimpleQueue::dequeue(SimpleTxnBuffer* tb
         return false;
     }
     if (qm->payload()->isPersistent() && m_store) {
-        qm->payload()->dequeueAsync(shared_from_this(), m_store);
+        qm->payload()->getPersistentContext()->dequeueAsync(shared_from_this(), m_store);
         return asyncDequeue(tb, qm);
     }
     return true;
@@ -316,7 +316,7 @@ SimpleQueue::asyncEnqueue(SimpleTxnBuffe
                           boost::shared_ptr<SimpleQueuedMessage> qm) {
     assert(qm.get());
     boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
-                                                                   qm->payload(),
+                                                                   /*qm->payload(),*/
                                                                    tb,
                                                                    &handleAsyncEnqueueResult,
                                                                    &m_resultQueue));
@@ -353,7 +353,7 @@ SimpleQueue::asyncDequeue(SimpleTxnBuffe
                           boost::shared_ptr<SimpleQueuedMessage> qm) {
     assert(qm.get());
     boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
-                                                                   qm->payload(),
+                                                                   /*qm->payload(),*/
                                                                    tb,
                                                                    &handleAsyncDequeueResult,
                                                                    &m_resultQueue));

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp Mon Aug 27 15:40:33 2012
@@ -40,7 +40,7 @@ SimpleQueuedMessage::SimpleQueuedMessage
         m_msg(msg)
 {
     if (m_queue->getStore()) {
-        m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+        m_enqHandle = q->getStore()->createEnqueueHandle(msg->getPersistentContext()->getMessageHandle(), q->getHandle());
     }
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp Mon Aug 27 15:40:33 2012
@@ -20,7 +20,8 @@
  */
 #include "qpid/broker/ThresholdAlerts.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
@@ -30,21 +31,20 @@ namespace qpid {
 namespace broker {
 namespace {
 const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
+bool isQMFv2(const Message& message)
 {
-    const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+    const qpid::framing::MessageProperties* props = qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<qpid::framing::MessageProperties>();
     return props && props->getAppId() == "qmf2";
 }
 
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+bool isThresholdEvent(const Message& message)
 {
-    if (message->getIsManagementMessage()) {
+    if (message.getIsManagementMessage()) {
         //is this a qmf event? if so is it a threshold event?
         if (isQMFv2(message)) {
-            const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
-            if (headers && headers->getAsString("qmf.content") == "_event") {
+            if (message.getPropertyAsString("qmf.content") == "_event") {
                 //decode as list
-                std::string content = message->getFrames().getContent();
+                std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
                 qpid::types::Variant::List list;
                 qpid::amqp_0_10::ListCodec::decode(content, list);
                 if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
@@ -57,7 +57,7 @@ bool isThresholdEvent(const boost::intru
                 }
             }
         } else {
-            std::string content = message->getFrames().getContent();
+            std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
             qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
             if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
                 buffer.getLong();//sequence
@@ -83,9 +83,9 @@ ThresholdAlerts::ThresholdAlerts(const s
       repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
       count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
 
-void ThresholdAlerts::enqueued(const QueuedMessage& m)
+void ThresholdAlerts::enqueued(const Message& m)
 {
-    size += m.payload->contentSize();
+    size += m.getContentSize();
     ++count;
     if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
         if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
@@ -94,7 +94,7 @@ void ThresholdAlerts::enqueued(const Que
             //enqueued on queues; it may even be that this event
             //causes a message to be enqueued on the queue we are
             //tracking, and so we need to avoid recursing
-            if (isThresholdEvent(m.payload)) return;
+            if (isThresholdEvent(m)) return;
             lastAlert = qpid::sys::now();
             agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
             QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
@@ -102,9 +102,9 @@ void ThresholdAlerts::enqueued(const Que
     }
 }
 
-void ThresholdAlerts::dequeued(const QueuedMessage& m)
+void ThresholdAlerts::dequeued(const Message& m)
 {
-    size -= m.payload->contentSize();
+    size -= m.getContentSize();
     --count;
     if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
         lastAlert = qpid::sys::EPOCH;
@@ -127,65 +127,14 @@ void ThresholdAlerts::observe(Queue& que
 }
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::framing::FieldTable& settings, uint16_t limitRatio)
-
-{
-    qpid::types::Variant::Map map;
-    qpid::amqp_0_10::translate(settings, map);
-    observe(queue, agent, map, limitRatio);
-}
-
-template <class T>
-class Option
+                              const QueueSettings& settings, uint16_t limitRatio)
 {
-  public:
-    Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
-    void addAlias(const std::string& name) { names.push_back(name); }
-    T get(const qpid::types::Variant::Map& settings) const
-    {
-        T value(defaultValue);
-        for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
-            if (get(settings, *i, value)) break;
-        }
-        return value;
-    }
-  private:
-    std::vector<std::string> names;
-    T defaultValue;
-
-    bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
-    {
-        qpid::types::Variant::Map::const_iterator i = settings.find(name);
-        if (i != settings.end()) {
-            try {
-                value = (T) i->second;
-            } catch (const qpid::types::InvalidConversion&) {
-                QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
-            }
-            return true;
-        } else {
-            return false;
-        }
-    }
-};
-
-void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::types::Variant::Map& settings, uint16_t limitRatio)
-
-{
-    //Note: aliases are keys defined by java broker
-    Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
-    repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
-
     //If no explicit threshold settings were given use specified
     //percentage of any limit from the policy.
-    const QueuePolicy* policy = queue.getPolicy();
-    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
-    countThreshold.addAlias("x-qpid-maximum-message-count");
-    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
-    sizeThreshold.addAlias("x-qpid-maximum-message-size");
+    uint32_t countThreshold = settings.alertThreshold.hasCount() ? settings.alertThreshold.getCount() : (settings.maxDepth.getCount()*limitRatio/100);
+    uint32_t sizeThreshold = settings.alertThreshold.hasSize() ? settings.alertThreshold.getSize() : (settings.maxDepth.getSize()*limitRatio/100);
 
-    observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+    observe(queue, agent, countThreshold, sizeThreshold, settings.alertRepeatInterval);
 }
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h Mon Aug 27 15:40:33 2012
@@ -27,15 +27,13 @@
 #include <string>
 
 namespace qpid {
-namespace framing {
-class FieldTable;
-}
 namespace management {
 class ManagementAgent;
 }
 namespace broker {
 
 class Queue;
+struct QueueSettings;
 /**
  * Class to manage generation of QMF alerts when particular thresholds
  * are breached on a queue.
@@ -48,19 +46,17 @@ class ThresholdAlerts : public QueueObse
                     const uint32_t countThreshold,
                     const uint64_t sizeThreshold,
                     const long repeatInterval);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {};
-    void requeued(const QueuedMessage&) {};
+    void enqueued(const Message&);
+    void dequeued(const Message&);
+    void acquired(const Message&) {};
+    void requeued(const Message&) {};
 
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
                         const uint64_t countThreshold,
                         const uint64_t sizeThreshold,
                         const long repeatInterval);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::framing::FieldTable& settings, uint16_t limitRatio);
-    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+                        const QueueSettings& settings, uint16_t limitRatio);
   private:
     const std::string name;
     qpid::management::ManagementAgent& agent;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h Mon Aug 27 15:40:33 2012
@@ -71,7 +71,6 @@ namespace qpid {
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~TxAccept(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
             // Used by cluster replication.
             const framing::SequenceSet& getAcked() const { return acked; }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp Mon Aug 27 15:40:33 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
 
 bool TxBuffer::prepare(TransactionContext* const ctxt)
 {
-    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+    for(op_iterator i = ops.begin(); i != ops.end(); i++){
         if(!(*i)->prepare(ctxt)){
             return false;
         }
@@ -74,7 +74,3 @@ bool TxBuffer::commitLocal(Transactional
     }
     return false;
 }
-
-void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
-}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org