You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [5/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueDepth.h"
+
+namespace qpid {
+namespace broker {
+
+QueueDepth::QueueDepth() {}
+QueueDepth::QueueDepth(uint32_t c, uint64_t s) : count(c), size(s) {}
+QueueDepth& QueueDepth::operator+=(const QueueDepth& other)
+{
+    if (count.valid) count.value += other.count.value;
+    if (size.valid) size.value += other.size.value;
+    return *this;
+}
+QueueDepth& QueueDepth::operator-=(const QueueDepth& other)
+{
+    if (count.valid) count.value -= other.count.value;
+    if (size.valid) size.value -= other.size.value;
+    return *this;
+}
+bool QueueDepth::operator==(const QueueDepth& other) const
+{
+    //only compare values, not validity an invalid value is always 0;
+    //this means that an invalid value will match an empty queue
+    //depth, which is fine
+    return (count.value == other.count.value)
+        && (size.value == other.size.value);
+}
+bool QueueDepth::operator!=(const QueueDepth& other) const
+{
+    return !(*this == other);
+}
+bool QueueDepth::operator<(const QueueDepth& other) const
+{
+    if (count.valid && size.valid)
+        return count.value < other.count.value || size.value < other.size.value;
+    else if (count.valid)
+        return count.value < other.count.value;
+    else
+        return size.value < other.size.value;
+}
+bool QueueDepth::operator>(const QueueDepth& other) const
+{
+    if (count.valid && size.valid)
+        return count.value > other.count.value || size.value > other.size.value;
+    else if (count.valid)
+        return count.value > other.count.value;
+    else
+        return size.value > other.size.value;
+}
+bool QueueDepth::hasCount() const { return count.valid; }
+uint32_t QueueDepth::getCount() const { return count.value; }
+void QueueDepth::setCount(uint32_t c) { count.value = c; count.valid = true; }
+bool QueueDepth::hasSize() const { return size.valid; }
+uint64_t QueueDepth::getSize() const { return size.value; }
+void QueueDepth::setSize(uint64_t c) { size.value = c; size.valid = true; }
+
+namespace{
+    template <typename T> QueueDepth::Optional<T> add(const QueueDepth::Optional<T>& a, const QueueDepth::Optional<T>& b)
+    {
+        QueueDepth::Optional<T> result;
+        if (a.valid && b.valid) {
+            result.valid = true;
+            result.value = a.value + b.value;
+        }
+        return result;
+    }
+    template <typename T> QueueDepth::Optional<T> subtract(const QueueDepth::Optional<T>& a, const QueueDepth::Optional<T>& b)
+    {
+        QueueDepth::Optional<T> result;
+        if (a.valid && b.valid) {
+            result.valid = true;
+            result.value = a.value - b.value;
+        }
+        return result;
+    }
+}
+QueueDepth operator-(const QueueDepth& a, const QueueDepth& b)
+{
+    QueueDepth result;
+    result.count = subtract(a.count, b.count);
+    result.size = subtract(a.size, b.size);
+    return result;
+}
+
+QueueDepth operator+(const QueueDepth& a, const QueueDepth& b)
+{
+    QueueDepth result;
+    result.count = add(a.count, b.count);
+    result.size = add(a.size, b.size);
+    return result;
+
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueDepth& d)
+{
+    if (d.hasCount()) o << "count: " << d.getCount();
+    if (d.hasSize()) {
+        if (d.hasCount()) o << ", ";
+        o << "size: " << d.getSize();
+    }
+    return o;
+}
+
+QueueDepth::operator bool() const { return hasCount() || hasSize(); }
+
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,74 @@
+#ifndef QPID_BROKER_QUEUEDEPTH_H
+#define QPID_BROKER_QUEUEDEPTH_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Represents a queue depth in message count and/or aggregate message
+ * size.
+ */
+class QueueDepth
+{
+  public:
+    QPID_BROKER_EXTERN QueueDepth();
+    QPID_BROKER_EXTERN QueueDepth(uint32_t count, uint64_t size);
+    QPID_BROKER_EXTERN QueueDepth& operator+=(const QueueDepth&);
+    QPID_BROKER_EXTERN QueueDepth& operator-=(const QueueDepth&);
+    QPID_BROKER_EXTERN bool operator==(const QueueDepth&) const;
+    QPID_BROKER_EXTERN bool operator!=(const QueueDepth&) const;
+    QPID_BROKER_EXTERN bool operator<(const QueueDepth& other) const;
+    QPID_BROKER_EXTERN bool operator>(const QueueDepth& other) const;
+    QPID_BROKER_EXTERN operator bool() const;
+    QPID_BROKER_EXTERN bool hasCount() const;
+    QPID_BROKER_EXTERN uint32_t getCount() const;
+    QPID_BROKER_EXTERN void setCount(uint32_t);
+    QPID_BROKER_EXTERN bool hasSize() const;
+    QPID_BROKER_EXTERN uint64_t getSize() const;
+    QPID_BROKER_EXTERN void setSize(uint64_t);
+  friend QPID_BROKER_EXTERN QueueDepth operator-(const QueueDepth&, const QueueDepth&);
+  friend QPID_BROKER_EXTERN QueueDepth operator+(const QueueDepth&, const QueueDepth&);
+    template <typename T> struct Optional
+    {
+        T value;
+        bool valid;
+
+        Optional(T v) : value(v), valid(true) {}
+        Optional() : value(0), valid(false) {}
+    };
+  private:
+    Optional<uint32_t> count;
+    Optional<uint64_t> size;
+};
+
+QPID_BROKER_EXTERN QueueDepth operator-(const QueueDepth&, const QueueDepth&);
+QPID_BROKER_EXTERN QueueDepth operator+(const QueueDepth&, const QueueDepth&);
+std::ostream& operator<<(std::ostream&, const QueueDepth&);
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUEDEPTH_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueFactory.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/LossyQueue.h"
+#include "qpid/broker/Lvq.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDistributor.h"
+#include "qpid/broker/MessageGroupManager.h"
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/FifoDistributor.h"
+#include <map>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+
+QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
+
+boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
+{
+    settings.validate();
+
+    //1. determine Queue type (i.e. whether we are subclassing Queue)
+    // -> if 'ring' policy is in use then subclass
+    boost::shared_ptr<Queue> queue;
+    if (settings.dropMessagesAtLimit) {
+        queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+    } else if (settings.lvqKey.size()) {
+        std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
+        queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
+    } else {
+        queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker));
+    }
+
+    //2. determine Messages type (i.e. structure)
+    if (settings.priorities) {
+        if (settings.defaultFairshare || settings.fairshare.size()) {
+            queue->messages = Fairshare::create(settings);
+        } else {
+            queue->messages = std::auto_ptr<Messages>(new PriorityQueue(settings.priorities));
+        }
+    } else if (settings.lvqKey.empty()) {//LVQ already handled above
+        queue->messages = std::auto_ptr<Messages>(new MessageDeque());
+    }
+
+    //3. determine MessageDistributor type
+    if (settings.groupKey.size()) {
+        boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings));
+        queue->allocator = mgm;
+        queue->addObserver(mgm);
+    } else {
+        queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) ));
+    }
+
+
+    //4. threshold event config
+    if (broker && broker->getManagementAgent()) {
+        ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getOptions().queueThresholdEventRatio);
+    }
+    //5. flow control config
+    QueueFlowLimit::observe(*queue, settings);
+
+    return queue;
+}
+
+void QueueFactory::setBroker(Broker* b)
+{
+    broker = b;
+}
+Broker* QueueFactory::getBroker()
+{
+    return broker;
+}
+void QueueFactory::setStore (MessageStore* s)
+{
+    store = s;
+}
+MessageStore* QueueFactory::getStore() const
+{
+    return store;
+}
+void QueueFactory::setParent(management::Manageable* p)
+{
+    parent = p;
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,73 @@
+#ifndef QPID_BROKER_QUEUEFACTORY_H
+#define QPID_BROKER_QUEUEFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace management {
+class Manageable;
+}
+namespace broker {
+class Broker;
+class MessageStore;
+class Queue;
+struct QueueSettings;
+
+/**
+ * Handles the creation and configuration of a Queue instance in order
+ * to meet the required settings
+ */
+class QueueFactory
+{
+  public:
+    QPID_BROKER_EXTERN QueueFactory();
+
+    QPID_BROKER_EXTERN boost::shared_ptr<Queue> create(const std::string& name, const QueueSettings& settings);
+
+    void setBroker(Broker*);
+    Broker* getBroker();
+
+    /**
+     * Set the store to use.  May only be called once.
+     */
+    void setStore (MessageStore*);
+
+    /**
+     * Return the message store used.
+     */
+    MessageStore* getStore() const;
+
+    /**
+     * Register the manageable parent for declared queues
+     */
+    void setParent(management::Manageable*);
+  private:
+    Broker* broker;
+    MessageStore* store;
+    management::Manageable* parent;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUEFACTORY_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Aug 10 12:04:27 2012
@@ -20,7 +20,9 @@
  */
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -57,34 +59,6 @@ namespace {
                                                     << "=" << max));
         }
     }
-
-    /** extract a capacity value as passed in an argument map
-     */
-    uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
-    {
-        FieldTable::ValuePtr v = settings.get(key);
-
-        int64_t result = 0;
-
-        if (!v) return defaultValue;
-        if (v->getType() == 0x23) {
-            QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
-        } else if (v->getType() == 0x33) {
-            QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
-        } else if (v->convertsTo<int64_t>()) {
-            result = v->get<int64_t>();
-            QPID_LOG(debug, "Got integer value for " << key << ": " << result);
-            if (result >= 0) return result;
-        } else if (v->convertsTo<std::string>()) {
-            std::string s(v->get<std::string>());
-            QPID_LOG(debug, "Got string value for " << key << ": " << s);
-            std::istringstream convert(s);
-            if (convert >> result && result >= 0) return result;
-        }
-
-        QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
-        return defaultValue;
-    }
 }
 
 
@@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
 
     if (queue) {
         queueName = _queue->getName();
-        if (queue->getPolicy()) {
-            maxSize = _queue->getPolicy()->getMaxSize();
-            maxCount = _queue->getPolicy()->getMaxCount();
-        }
+        if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
+        if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
         broker = queue->getBroker();
         queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
         if (queueMgmtObj) {
@@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit()
     sys::Mutex::ScopedLock l(indexLock);
     if (!index.empty()) {
         // we're gone - release all pending msgs
-        for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+        for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
              itr != index.end(); ++itr)
             if (itr->second)
                 try {
-                    itr->second->getIngressCompletion().finishCompleter();
+                    itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
                 } catch (...) {}    // ignore - not safe for a destructor to throw.
         index.clear();
     }
 }
 
 
-void QueueFlowLimit::enqueued(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const Message& msg)
 {
     sys::Mutex::ScopedLock l(indexLock);
 
     ++count;
-    size += msg.payload->contentSize();
+    size += msg.getContentSize();
 
     if (!flowStopped) {
         if (flowStopCount && count > flowStopCount) {
@@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const Queu
     if (flowStopped || !index.empty()) {
         // ignore flow control if we are populating the queue due to cluster replication:
         if (broker && broker->isClusterUpdatee()) {
-            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
             return;
         }
-        QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
-        msg.payload->getIngressCompletion().startCompleter();    // don't complete until flow resumes
+        QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
+        msg.getPersistentContext()->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         bool unique;
-        unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+        unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second;
         // Like this to avoid tripping up unused variable warning when NDEBUG set
         if (!unique) assert(unique);
     }
@@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const Queu
 
 
 
-void QueueFlowLimit::dequeued(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const Message& msg)
 {
     sys::Mutex::ScopedLock l(indexLock);
 
@@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const Queu
         throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
     }
 
-    uint64_t _size = msg.payload->contentSize();
+    uint64_t _size = msg.getContentSize();
     if (_size <= size) {
         size -= _size;
     } else {
@@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const Queu
     if (!index.empty()) {
         if (!flowStopped) {
             // flow enabled - release all pending msgs
-            for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+            for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
                  itr != index.end(); ++itr)
                 if (itr->second)
-                    itr->second->getIngressCompletion().finishCompleter();
+                    itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
             index.clear();
         } else {
             // even if flow controlled, we must release this msg as it is being dequeued
-            std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+            std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence());
             if (itr != index.end()) {       // this msg is flow controlled, release it:
-                msg.payload->getIngressCompletion().finishCompleter();
+                msg.getPersistentContext()->getIngressCompletion().finishCompleter();
                 index.erase(itr);
             }
         }
@@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_
 }
 
 
-void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
 {
     QueueFlowLimit *ptr = createLimit( &queue, settings );
     if (ptr) {
@@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queu
 }
 
 /** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
 {
-    std::string type(QueuePolicy::getType(settings));
-
-    if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+    if (settings.dropMessagesAtLimit) {
         // The size of a RING queue is limited by design - no need for flow control.
         return 0;
     }
 
-    if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
-        settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+    if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
         // user provided (some) flow settings manually...
-        uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
-        uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
-        uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
-        uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
-        if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow control
+        if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
+            return new QueueFlowLimit(queue,
+                                      settings.flowStop.getCount(),
+                                      settings.flowResume.getCount(),
+                                      settings.flowStop.getSize(),
+                                      settings.flowResume.getSize());
+        } else {
+            //don't have a non-zero value for either the count or the
+            //size to stop at, yet at least one of these settings was
+            //provided, i.e it was set to 0 explicitly which we treat
+            //as turning it off
             return 0;
         }
-        return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
 
     if (defaultFlowStopRatio) {   // broker has a default ratio setup...
-        uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+        uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
         uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
         uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
-        uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0);  // no size by default
+        uint32_t maxMsgCount =  settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
         uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
         uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
         return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
     return 0;
@@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::fram
     framing::SequenceSet ss;
     if (!index.empty()) {
         /* replicate the set of messages pending flow control */
-        for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+        for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
              itr != index.end(); ++itr) {
             ss.add(itr->first);
         }
@@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid
             ++i;
             fcmsg.add(first, last);
             for (SequenceNumber seq = first; seq <= last; ++seq) {
-                QueuedMessage msg;
+                Message msg;
                 queue->find(seq, msg);   // fyi: may not be found if msg is acquired & unacked
                 bool unique;
-                unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+                unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
                 // Like this to avoid tripping up unused variable warning when NDEBUG set
                 if (!unique) assert(unique);
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Fri Aug 10 12:04:27 2012
@@ -26,9 +26,9 @@
 #include <iostream>
 #include <memory>
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Mutex.h"
 
@@ -45,6 +45,8 @@ namespace qpid {
 namespace broker {
 
 class Broker;
+class Queue;
+struct QueueSettings;
 
 /**
  * Producer flow control: when level is > flowStop*, flow control is ON.
@@ -80,13 +82,13 @@ class Broker;
 
     QPID_BROKER_EXTERN virtual ~QueueFlowLimit();
 
-    /** the queue has added QueuedMessage.  Returns true if flow state changes */
-    QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
-    /** the queue has removed QueuedMessage.  Returns true if flow state changes */
-    QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+    /** the queue has added QueuedMessage */
+    QPID_BROKER_EXTERN void enqueued(const Message&);
+    /** the queue has removed QueuedMessage */
+    QPID_BROKER_EXTERN void dequeued(const Message&);
     /** ignored */
-    QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
-    QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
+    QPID_BROKER_EXTERN void acquired(const Message&) {};
+    QPID_BROKER_EXTERN void requeued(const Message&) {};
 
     /** for clustering: */
     QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
@@ -106,14 +108,14 @@ class Broker;
     void decode(framing::Buffer& buffer);
     uint32_t encodedSize() const;
 
-    static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
     static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
 
     friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
 
  protected:
     // msgs waiting for flow to become available.
-    std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
+    std::map<framing::SequenceNumber, Message > index;
     mutable qpid::sys::Mutex indexLock;
 
     _qmfBroker::Queue *queueMgmtObj;
@@ -123,7 +125,7 @@ class Broker;
     QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
                    uint32_t flowStopCount, uint32_t flowResumeCount,
                    uint64_t flowStopSize,  uint64_t flowResumeSize);
-    static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h Fri Aug 10 12:04:27 2012
@@ -24,8 +24,8 @@
 namespace qpid {
 namespace broker {
 
-struct QueuedMessage;
 class Consumer;
+class Message;
 
 /**
  * Interface for notifying classes who want to act as 'observers' of a queue of particular
@@ -63,10 +63,10 @@ class QueueObserver
     virtual ~QueueObserver() {}
 
     // note: the Queue will hold the messageLock while calling these methods!
-    virtual void enqueued(const QueuedMessage&) = 0;
-    virtual void dequeued(const QueuedMessage&) = 0;
-    virtual void acquired(const QueuedMessage&) = 0;
-    virtual void requeued(const QueuedMessage&) = 0;
+    virtual void enqueued(const Message&) = 0;
+    virtual void dequeued(const Message&) = 0;
+    virtual void acquired(const Message&) = 0;
+    virtual void requeued(const Message&) = 0;
     virtual void consumerAdded( const Consumer& ) {};
     virtual void consumerRemoved( const Consumer& ) {};
  private:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Aug 10 12:04:27 2012
@@ -21,7 +21,6 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -32,50 +31,43 @@ using namespace qpid::broker;
 using namespace qpid::sys;
 using std::string;
 
-QueueRegistry::QueueRegistry(Broker* b) :
-    counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
+QueueRegistry::QueueRegistry(Broker* b)
+{
+    setBroker(b);
+}
 
 QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, 
-                       bool autoDelete, const OwnershipToken* owner,
+QueueRegistry::declare(const string& name, const QueueSettings& settings,
                        boost::shared_ptr<Exchange> alternate,
-                       const qpid::framing::FieldTable& arguments,
                        bool recovering/*true if this declare is a
                                         result of recovering queue
-                                        definition from persistente
+                                        definition from persistent
                                         record*/)
 {
-    Queue::shared_ptr queue;
     std::pair<Queue::shared_ptr, bool> result;
     {
         RWlock::ScopedWlock locker(lock);
-        string name = declareName.empty() ? generateName() : declareName;
-        assert(!name.empty());
         QueueMap::iterator i =  queues.find(name);
-
         if (i == queues.end()) {
-            queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+            Queue::shared_ptr queue = create(name, settings);
+            //Move this to factory also?
             if (alternate) {
                 queue->setAlternateExchange(alternate);//need to do this *before* create
                 alternate->incAlternateUsers();
             }
             if (!recovering) {
-                //apply settings & create persistent record if required
-                queue->create(arguments);
-            } else {
-                //i.e. recovering a queue for which we already have a persistent record
-                queue->configure(arguments);
+                //create persistent record if required
+                queue->create();
             }
             queues[name] = queue;
-            if (lastNode) queue->setLastNodeFailure();
             result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
             result = std::pair<Queue::shared_ptr, bool>(i->second, false);
         }
     }
-    if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+    if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
     return result;
 }
 
@@ -89,7 +81,7 @@ void QueueRegistry::destroy(const string
             queues.erase(i);
         }
     }
-    if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
+    if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){
@@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(con
     return q;
 }
 
-string QueueRegistry::generateName(){
-    string name;
-    do {
-        std::stringstream ss;
-        ss << "tmp_" << counter++;
-        name = ss.str();
-        // Thread safety: Private function, only called with lock held
-        // so this is OK.
-    } while(queues.find(name) != queues.end());
-    return name;
-}
-
 void QueueRegistry::setStore (MessageStore* _store)
 {
-    store = _store;
+    QueueFactory::setStore(_store);
 }
 
-MessageStore* QueueRegistry::getStore() const {
-    return store;
+MessageStore* QueueRegistry::getStore() const
+{
+    return QueueFactory::getStore();
 }
 
-void QueueRegistry::updateQueueClusterState(bool _lastNode)
+void QueueRegistry::setParent(qpid::management::Manageable* _parent)
 {
-    RWlock::ScopedRlock locker(lock);
-    for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
-        if (_lastNode){
-            i->second->setLastNodeFailure();
-        } else {
-            i->second->clearLastNodeFailure();
-        }
-    }
-    lastNode = _lastNode;
+    QueueFactory::setParent(_parent);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Fri Aug 10 12:04:27 2012
@@ -22,8 +22,8 @@
 #define _QueueRegistry_
 
 #include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueFactory.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/management/Manageable.h"
 #include "qpid/framing/FieldTable.h"
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
@@ -34,11 +34,8 @@ namespace qpid {
 namespace broker {
 
 class Queue;
-class QueueEvents;
 class Exchange;
 class OwnershipToken;
-class Broker;
-class MessageStore;
 
 /**
  * A registry of queues indexed by queue name.
@@ -47,7 +44,7 @@ class MessageStore;
  * are deleted when and only when they are no longer in use.
  *
  */
-class QueueRegistry {
+class QueueRegistry : QueueFactory {
   public:
     QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
     QPID_BROKER_EXTERN ~QueueRegistry();
@@ -60,11 +57,8 @@ class QueueRegistry {
      */
     QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
         const std::string& name,
-        bool durable = false,
-        bool autodelete = false,
-        const OwnershipToken* owner = 0,
+        const QueueSettings& settings,
         boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
-        const qpid::framing::FieldTable& args = framing::FieldTable(),
         bool recovering = false);
 
     /**
@@ -101,11 +95,6 @@ class QueueRegistry {
     QPID_BROKER_EXTERN boost::shared_ptr<Queue> get(const std::string& name);
 
     /**
-     * Generate unique queue name.
-     */
-    std::string generateName();
-
-    /**
      * Set the store to use.  May only be called once.
      */
     void setStore (MessageStore*);
@@ -118,7 +107,7 @@ class QueueRegistry {
     /**
      * Register the manageable parent for declared queues
      */
-    void setParent (management::Manageable* _parent) { parent = _parent; }
+    void setParent (management::Manageable*);
 
     /** Call f for each queue in the registry. */
     template <class F> void eachQueue(F f) const {
@@ -127,22 +116,10 @@ class QueueRegistry {
             f(i->second);
     }
 
-	/**
-	* Change queue mode when cluster size drops to 1 node, expands again
-	* in practice allows flow queue to disk when last name to be exectuted
-	*/
-	void updateQueueClusterState(bool lastNode);
-
 private:
     typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
     QueueMap queues;
     mutable qpid::sys::RWlock lock;
-    int counter;
-    MessageStore* store;
-    QueueEvents* events;
-    management::Manageable* parent;
-    bool lastNode; //used to set mode on queue declare
-    Broker* broker;
 };
 
 

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueSettings.h"
+#include "QueueFlowLimit.h"
+#include "MessageGroupManager.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+
+
+namespace qpid {
+namespace broker {
+
+namespace {
+const std::string MAX_COUNT("qpid.max_count");
+const std::string MAX_SIZE("qpid.max_size");
+const std::string POLICY_TYPE("qpid.policy_type");
+const std::string POLICY_TYPE_REJECT("reject");
+const std::string POLICY_TYPE_RING("ring");
+const std::string NO_LOCAL("no-local");
+const std::string TRACE_ID("qpid.trace.id");
+const std::string TRACE_EXCLUDES("qpid.trace.exclude");
+const std::string LVQ_KEY("qpid.last_value_queue_key");
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+const std::string ALERT_REPEAT_GAP("qpid.alert_repeat_gap");
+const std::string ALERT_COUNT("qpid.alert_count");
+const std::string ALERT_SIZE("qpid.alert_size");
+const std::string PRIORITIES("qpid.priorities");
+const std::string FAIRSHARE("qpid.fairshare");
+const std::string FAIRSHARE_ALIAS("x-qpid-fairshare");
+
+const std::string LVQ_LEGACY("qpid.last_value_queue");
+const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
+const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
+
+
+bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
+{
+    if (key.find(basename) == 0) {
+        qpid::types::Variant index(key.substr(basename.size()+1));
+        settings.fairshare[index] = value;
+        return true;
+    } else {
+        return false;
+    }
+}
+bool isFairshareSetting(const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
+{
+    return handleFairshareSetting(FAIRSHARE, key, value, settings) || handleFairshareSetting(FAIRSHARE_ALIAS, key, value, settings);
+}
+}
+
+const QueueSettings::Aliases QueueSettings::aliases;
+
+QueueSettings::QueueSettings(bool d, bool a) :
+    durable(d),
+    autodelete(a),
+    priorities(0),
+    defaultFairshare(0),
+    shareGroups(false),
+    addTimestamp(false),
+    dropMessagesAtLimit(false),
+    noLocal(false),
+    autoDeleteDelay(0),
+    alertRepeatInterval(60)
+{}
+
+bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
+{
+    if (key == MAX_COUNT && value.asUint32() > 0) {
+        maxDepth.setCount(value);
+        return true;
+    } else if (key == MAX_SIZE && value.asUint64() > 0) {
+        maxDepth.setSize(value);
+        return true;
+    } else if (key == POLICY_TYPE) {
+        if (value.getString() == POLICY_TYPE_RING) {
+            dropMessagesAtLimit = true;
+            return true;
+        } else if (value.getString() == POLICY_TYPE_REJECT) {
+            //do nothing, thats the default
+            return true;
+        } else {
+            QPID_LOG(warning, "Unrecognised policy option: " << value);
+            return false;
+        }
+    } else if (key == NO_LOCAL) {
+        noLocal = true;
+        return true;
+    } else if (key == TRACE_ID) {
+        traceId = value.asString();
+        return true;
+    } else if (key == TRACE_EXCLUDES) {
+        traceExcludes = value.asString();
+        return true;
+    } else if (key == PRIORITIES) {
+        priorities = value;
+        return true;
+    } else if (key == FAIRSHARE) {
+        defaultFairshare = value;
+        return true;
+    } else if (isFairshareSetting(key, value, *this)) {
+        return true;
+    } else if (key == MessageGroupManager::qpidMessageGroupKey) {
+        groupKey = value.asString();
+        return true;
+    } else if (key == MessageGroupManager::qpidSharedGroup) {
+        shareGroups = value;
+        return true;
+    } else if (key == MessageGroupManager::qpidMessageGroupTimestamp) {
+        addTimestamp = value;
+        return true;
+    } else if (key == LVQ_KEY) {
+        lvqKey = value.asString();
+        return true;
+    } else if (key == LVQ_LEGACY) {
+        if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
+        return true;
+    } else if (key == LVQ_LEGACY_NOBROWSE) {
+        QPID_LOG(warning, "Ignoring 'no-browse' directive for LVQ; it is no longer necessary");
+        if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
+        return true;
+    } else if (key == AUTO_DELETE_TIMEOUT) {
+        autoDeleteDelay = value;
+        return true;
+    } else if (key == QueueFlowLimit::flowStopCountKey) {
+        flowStop.setCount(value);
+        return true;
+    } else if (key == QueueFlowLimit::flowResumeCountKey) {
+        flowResume.setCount(value);
+        return true;
+    } else if (key == QueueFlowLimit::flowStopSizeKey) {
+        flowStop.setSize(value);
+        return true;
+    } else if (key == QueueFlowLimit::flowResumeSizeKey) {
+        flowResume.setSize(value);
+        return true;
+    } else if (key == ALERT_REPEAT_GAP) {
+        alertRepeatInterval = value;
+        return true;
+    } else if (key == ALERT_COUNT) {
+        alertThreshold.setCount(value);
+        return true;
+    } else if (key == ALERT_SIZE) {
+        alertThreshold.setSize(value);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void QueueSettings::validate() const
+{
+    if (lvqKey.size() && priorities > 0)
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PRIORITIES << " for the same queue"));
+    if ((fairshare.size() || defaultFairshare) && priorities == 0)
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify fairshare settings when queue is not enabled for priorities"));
+    if (fairshare.size() > priorities)
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot have fairshare set for priority levels greater than " << priorities));
+    if (groupKey.size() && lvqKey.size())
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
+    if (groupKey.size() && priorities)
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
+    if (shareGroups && groupKey.empty()) {
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidSharedGroup
+                                                               << " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+    }
+    if (addTimestamp && groupKey.empty()) {
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidMessageGroupTimestamp
+                                                               << " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+    }
+
+    // @todo: remove once "sticky" consumers are supported - see QPID-3347
+    if (!shareGroups && groupKey.size()) {
+        throw qpid::framing::InvalidArgumentException(QPID_MSG("Only shared groups are supported at present; " << MessageGroupManager::qpidSharedGroup
+                                                               << " is required if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+    }
+}
+
+void QueueSettings::populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused)
+{
+    original = inputs;
+    for (qpid::types::Variant::Map::const_iterator i = inputs.begin(); i != inputs.end(); ++i) {
+        Aliases::const_iterator a = aliases.find(i->first);
+        if (!handle((a != aliases.end() ? a->second : i->first), i->second)) unused.insert(*i);
+    }
+}
+void QueueSettings::populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused)
+{
+    qpid::types::Variant::Map o;
+    qpid::amqp_0_10::translate(inputs, original);
+    populate(original, o);
+    qpid::amqp_0_10::translate(o, unused);
+}
+std::map<std::string, qpid::types::Variant> QueueSettings::asMap() const
+{
+    return original;
+}
+
+QueueSettings::Aliases::Aliases()
+{
+    insert(value_type("x-qpid-priorities", "qpid.priorities"));
+    insert(value_type("x-qpid-fairshare", "qpid.fairshare"));
+    insert(value_type("x-qpid-minimum-alert-repeat-gap", "qpid.alert_repeat_gap"));
+    insert(value_type("x-qpid-maximum-message-count", "qpid.alert_count"));
+    insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size"));
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,92 @@
+#ifndef QPID_BROKER_QUEUESETTINGS_H
+#define QPID_BROKER_QUEUESETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueDepth.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/framing/FieldTable.h"
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace types {
+class Variant;
+}
+namespace broker {
+
+/**
+ * Defines the various queue configuration settings that can be specified
+ */
+struct QueueSettings
+{
+    QPID_BROKER_EXTERN QueueSettings(bool durable=false, bool autodelete=false);
+
+    bool durable;
+    bool autodelete;
+
+    //basic queue types:
+    std::string lvqKey;
+    uint32_t priorities;
+    uint32_t defaultFairshare;
+    std::map<uint32_t,uint32_t> fairshare;
+
+    //message groups:
+    std::string groupKey;
+    bool shareGroups;
+    bool addTimestamp;//not actually used; always on at present?
+
+    QueueDepth maxDepth;
+    bool dropMessagesAtLimit;//aka ring queue policy
+
+    bool noLocal;
+    std::string traceId;
+    std::string traceExcludes;
+    uint64_t autoDeleteDelay;//queueTtl?
+
+    //flow control:
+    QueueDepth flowStop;
+    QueueDepth flowResume;
+
+    //threshold events:
+    QueueDepth alertThreshold;
+    int64_t alertRepeatInterval;
+
+    //yuck, yuck
+    qpid::framing::FieldTable storeSettings;
+    std::map<std::string, qpid::types::Variant> original;
+
+    bool handle(const std::string& key, const qpid::types::Variant& value);
+    void validate() const;
+    QPID_BROKER_EXTERN void populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused);
+    QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused);
+    std::map<std::string, qpid::types::Variant> asMap() const;
+
+    struct Aliases : std::map<std::string, std::string>
+    {
+        Aliases();
+    };
+    static const Aliases aliases;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUESETTINGS_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Fri Aug 10 12:04:27 2012
@@ -22,8 +22,8 @@
 #define _QueuedMessage_
 
 #include "qpid/broker/Message.h"
-#include "BrokerImportExport.h"
-#include <iosfwd>
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/broker/BrokerImportExport.h"
 
 namespace qpid {
 namespace broker {
@@ -32,20 +32,19 @@ class Queue;
 
 struct QueuedMessage
 {
-    boost::intrusive_ptr<Message> payload;
+    Message message;
     framing::SequenceNumber position;
-    typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
-    Status status;
+    enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
     Queue* queue;
 
-    QueuedMessage(Queue* q=0,
-                  boost::intrusive_ptr<Message> msg=0,
-                  framing::SequenceNumber sn=0,
-                  Status st=AVAILABLE
-    ) :  payload(msg), position(sn), status(st), queue(q) {}
+    QueuedMessage() : queue(0) {}
+    QueuedMessage(Queue* q, Message msg, framing::SequenceNumber sn) :
+        message(msg), position(sn), queue(q) {}
+    QueuedMessage(Queue* q) : queue(q) {}
 };
 
-inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
+{
     return a.position < b.position;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Fri Aug 10 12:04:27 2012
@@ -22,10 +22,9 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveredDequeue.h"
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
 {
     queue->recoverPrepared(msg);
 }
@@ -38,11 +37,11 @@ bool RecoveredDequeue::prepare(Transacti
 
 void RecoveredDequeue::commit() throw()
 {
-    queue->enqueueAborted(msg);
+    queue->dequeueCommited(msg);
 }
 
 void RecoveredDequeue::rollback() throw()
 {
-    queue->process(msg);
+    queue->dequeueAborted(msg);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Fri Aug 10 12:04:27 2012
@@ -26,8 +26,6 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/TxOp.h"
 
-#include <boost/intrusive_ptr.hpp>
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -36,18 +34,17 @@ namespace qpid {
     namespace broker {
         class RecoveredDequeue : public TxOp{
             boost::shared_ptr<Queue> queue;
-            boost::intrusive_ptr<Message> msg;
+            Message msg;
 
         public:
-            RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+            RecoveredDequeue(boost::shared_ptr<Queue> queue, Message msg);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~RecoveredDequeue(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
             boost::shared_ptr<Queue> getQueue() const { return queue; }
-            boost::intrusive_ptr<Message> getMessage() const { return msg; }
+            Message getMessage() const { return msg; }
         };
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Fri Aug 10 12:04:27 2012
@@ -22,10 +22,9 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
 {
     queue->recoverPrepared(msg);
 }
@@ -36,7 +35,7 @@ bool RecoveredEnqueue::prepare(Transacti
 }
 
 void RecoveredEnqueue::commit() throw(){
-    queue->process(msg);
+    queue->enqueueCommited(msg);
 }
 
 void RecoveredEnqueue::rollback() throw(){

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Fri Aug 10 12:04:27 2012
@@ -26,8 +26,6 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/TxOp.h"
 
-#include <boost/intrusive_ptr.hpp>
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -36,19 +34,17 @@ namespace qpid {
 namespace broker {
 class RecoveredEnqueue : public TxOp{
     boost::shared_ptr<Queue> queue;
-    boost::intrusive_ptr<Message> msg;
+    Message msg;
 
   public:
-    RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+    RecoveredEnqueue(boost::shared_ptr<Queue> queue, Message msg);
     virtual bool prepare(TransactionContext* ctxt) throw();
     virtual void commit() throw();
     virtual void rollback() throw();
     virtual ~RecoveredEnqueue(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
     boost::shared_ptr<Queue> getQueue() const { return queue; }
-    boost::intrusive_ptr<Message> getMessage() const { return msg; }
-            
+    Message getMessage() const { return msg; }
 };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Aug 10 12:04:27 2012
@@ -21,11 +21,13 @@
 #include "qpid/broker/RecoveryManagerImpl.h"
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/reply_exceptions.h"
 
 using boost::dynamic_pointer_cast;
@@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImp
 
 class RecoverableMessageImpl : public RecoverableMessage
 {
-    intrusive_ptr<Message> msg;
+    Message msg;
 public:
-    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
+    RecoverableMessageImpl(const Message& _msg);
     ~RecoverableMessageImpl() {};
     void setPersistenceId(uint64_t id);
     void setRedelivered();
@@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryMan
 
 RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
 {
-    boost::intrusive_ptr<Message> message(new Message());
-    message->decodeHeader(buffer);
-    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
+    //TODO: determine encoding/version actually used
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+    transfer->decodeHeader(buffer);
+    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
 }
 
 RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
@@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComple
     exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
 }
 
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
-    if (!msg->isPersistent()) {
-        msg->forcePersistent(); // set so that message will get dequeued from store.
-    }
-}
+RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {}
 
 bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
 {
@@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent
 
 void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
 {
-    msg->decodeContent(buffer);
+    msg.getPersistentContext()->decodeContent(buffer);
 }
 
 void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
@@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Que
 
 void RecoverableMessageImpl::setPersistenceId(uint64_t id)
 {
-    msg->setPersistenceId(id);
+    msg.getPersistentContext()->setPersistenceId(id);
 }
 
 void RecoverableMessageImpl::setRedelivered()
 {
-    msg->redeliver();
+    msg.deliver();//increment delivery count (but at present that isn't recorded durably)
 }
 
 void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
@@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenc
 {
     queue->setPersistenceId(id);
 }
-       
+
 uint64_t RecoverableQueueImpl::getPersistenceId() const
 {
 	return queue->getPersistenceId();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Aug 10 12:04:27 2012
@@ -29,7 +29,7 @@
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/SequenceSet.h"
@@ -65,9 +65,8 @@ using qpid::management::Manageable;
 using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
+SemanticState::SemanticState(SessionState& ss)
     : session(ss),
-      deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
@@ -89,7 +88,7 @@ void SemanticState::closed() {
         if (dtxBuffer.get()) {
             dtxBuffer->fail();
         }
-        recover(true);
+        requeue();
 
         //now unsubscribe, which may trigger queue deletion and thus
         //needs to occur after the requeueing of unacked messages
@@ -124,7 +123,7 @@ void SemanticState::consume(const string
                          resumeId, resumeTtl, arguments);
     if (!c)                     // Create plain consumer
         c = ConsumerImpl::shared_ptr(
-            new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+            new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
                              resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
@@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           const string& _name,
                                           Queue::shared_ptr _queue,
                                           bool ack,
-                                          bool _acquire,
+                                          SubscriptionType type,
                                           bool _exclusive,
                                           const string& _tag,
                                           const string& _resumeId,
@@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           const framing::FieldTable& _arguments
 
 ) :
-    Consumer(_name, _acquire),
+Consumer(_name, type),
     parent(_parent),
     queue(_queue),
     ackExpected(ack),
-    acquire(_acquire),
+    acquire(type == CONSUMER),
     blocked(true),
     exclusive(_exclusive),
     resumeId(_resumeId),
@@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerI
     return &(parent->session);
 }
 
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+{
+    return deliver(cursor, msg, shared_from_this());
+}
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
 {
     assertClusterSafe();
-    allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
-                          shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
+    allocateCredit(msg);
+    DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+                          consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
-    parent->deliver(record, sync);
+    const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
+
+    record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+                                         ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
+                                         acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
+                                         msg.getAnnotations(),
+                                         sync));
     if (credit.isWindowMode() || ackExpected || !acquire) {
         parent->record(record);
     }
     if (acquire && !ackExpected) {  // auto acquire && auto accept
-        msg.queue->dequeue(0, msg);
+        queue->dequeue(0 /*ctxt*/, cursor);
         record.setEnded();
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
 }
 
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
+bool SemanticState::ConsumerImpl::filter(const Message&)
 {
     return true;
 }
 
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::accept(const Message& msg)
 {
     assertClusterSafe();
     // TODO aconway 2009-06-08: if we have byte & message credit but
@@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const Co
 }
 }
 
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
 {
     assertClusterSafe();
     Credit original = credit;
-    credit.consume(1, msg->getRequiredCredit());
+    credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << original << " now " << credit);
 
 }
 
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
 {
-    bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+    bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
     QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
-             <<  " credit for message of " << msg->getRequiredCredit() << " bytes: "
+             <<  " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
              << credit);
     return enoughCredit;
 }
@@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl
         session.getConnection().outputTasks.removeOutputTask(c.get());
 }
 
-
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
     disable(c);
@@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl:
     c->cancel();
 }
 
-void SemanticState::handle(intrusive_ptr<Message> msg) {
-    if (txBuffer.get()) {
-        TxPublish* deliverable(new TxPublish(msg));
-        TxOp::shared_ptr op(deliverable);
-        route(msg, *deliverable);
-        txBuffer->enlist(op);
-    } else {
-        DeliverableMessage deliverable(msg);
-        route(msg, deliverable);
-        if (msg->isContentReleaseRequested()) {
-            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
-            // presence of these messages). Do not change these without also checking these tests.
-            if (msg->isContentReleaseBlocked()) {
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
-            } else {
-                msg->releaseContent();
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content released");
-            }
-        }
-    }
-}
-
-namespace
+TxBuffer* SemanticState::getTxBuffer()
 {
-const std::string nullstring;
+    return txBuffer.get();
 }
 
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+void SemanticState::route(Message& msg, Deliverable& strategy) {
+    msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
 
-    std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName
-        || cacheExchange->isDestroyed())
-    {
+    std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
+    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
-    }
-    cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
-    std::string id =
-    	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+    std::string id = msg.getUserId();
     if (authMsg &&  !id.empty() && !session.getConnection().isAuthenticatedUser(id))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<
     AclModule* acl = getSession().getBroker().getAcl();
     if (acl && acl->doTransferAcl())
     {
-        if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
+        if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() ))
             throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
-                                               exchangeName << " with routing-key " << msg->getRoutingKey()));
+                                               exchangeName << " with routing-key " << msg.getRoutingKey()));
     }
 
     cacheExchange->route(strategy);
@@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy);
         }
-        if (!strategy.delivered) {
-            msg->destroy();
-        }
     }
 
 }
@@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::comple
     }
 }
 
-void SemanticState::recover(bool requeue)
+void SemanticState::requeue()
 {
-    if(requeue){
-        //take copy and clear unacked as requeue may result in redelivery to this session
-        //which will in turn result in additions to unacked
-        DeliveryRecords copy = unacked;
-        unacked.clear();
-        for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
-    }else{
-        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
-        //unconfirmed messages re redelivered and therefore have their
-        //id adjusted, confirmed messages are not and so the ordering
-        //w.r.t id is lost
-        sort(unacked.begin(), unacked.end());
-    }
+    //take copy and clear unacked as requeue may result in redelivery to this session
+    //which will in turn result in additions to unacked
+    DeliveryRecords copy = unacked;
+    unacked.clear();
+    for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     getSession().setUnackedCount(unacked.size());
 }
 
-void SemanticState::deliver(DeliveryRecord& msg, bool sync)
-{
-    return deliveryAdapter.deliver(msg, sync);
-}
+
+SessionContext& SemanticState::getSession() { return session; }
+const SessionContext& SemanticState::getSession() const { return session; }
+
 
 const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Aug 10 12:04:27 2012
@@ -26,7 +26,6 @@
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Credit.h"
 #include "qpid/broker/Deliverable.h"
-#include "qpid/broker/DeliveryAdapter.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/DtxManager.h"
@@ -34,12 +33,15 @@
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/TxBuffer.h"
 
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/AtomicValue.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Subscription.h"
 
 #include <list>
@@ -47,13 +49,15 @@
 #include <vector>
 
 #include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
 #include <boost/cast.hpp>
 
 namespace qpid {
 namespace broker {
 
+class Exchange;
+class MessageStore;
 class SessionContext;
+class SessionState;
 
 /**
  *
@@ -94,28 +98,28 @@ class SemanticState : private boost::non
         int deliveryCount;
         qmf::org::apache::qpid::broker::Subscription* mgmtObject;
 
-        bool checkCredit(boost::intrusive_ptr<Message>& msg);
-        void allocateCredit(boost::intrusive_ptr<Message>& msg);
+        bool checkCredit(const Message& msg);
+        void allocateCredit(const Message& msg);
         bool haveCredit();
 
       protected:
         QPID_BROKER_EXTERN virtual bool doDispatch();
         size_t unacked() { return parent->unacked.size(); }
+        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
 
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
 
-        QPID_BROKER_EXTERN ConsumerImpl(
-            SemanticState* parent,
-            const std::string& name, boost::shared_ptr<Queue> queue,
-            bool ack, bool acquire, bool exclusive,
-            const std::string& tag, const std::string& resumeId, uint64_t resumeTtl,
-            const framing::FieldTable& arguments);
-        QPID_BROKER_EXTERN virtual ~ConsumerImpl();
+        QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
+                     const std::string& name, boost::shared_ptr<Queue> queue,
+                     bool ack, SubscriptionType type, bool exclusive,
+                     const std::string& tag, const std::string& resumeId,
+                     uint64_t resumeTtl, const framing::FieldTable& arguments);
+        QPID_BROKER_EXTERN ~ConsumerImpl();
         QPID_BROKER_EXTERN OwnershipToken* getSession();
-        QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg);
-        QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg);
-        QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg);
+        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+        QPID_BROKER_EXTERN bool filter(const Message&);
+        QPID_BROKER_EXTERN bool accept(const Message&);
         QPID_BROKER_EXTERN void cancel() {}
 
         QPID_BROKER_EXTERN void disableNotify();
@@ -153,7 +157,7 @@ class SemanticState : private boost::non
         SemanticState& getParent() { return *parent; }
         const SemanticState& getParent() const { return *parent; }
 
-        void acknowledged(const broker::QueuedMessage&) {}
+        void acknowledged(const DeliveryRecord&) {}
 
         // manageable entry points
         QPID_BROKER_EXTERN management::ManagementObject*
@@ -168,8 +172,7 @@ class SemanticState : private boost::non
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
 
-    SessionContext& session;
-    DeliveryAdapter& deliveryAdapter;
+    SessionState& session;
     ConsumerImplMap consumers;
     NameGenerator tagGenerator;
     DeliveryRecords unacked;
@@ -185,7 +188,6 @@ class SemanticState : private boost::non
     //needed for queue delete events in auto-delete:
     const std::string connectionId;
 
-    void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void checkDtxTimeout();
 
     bool complete(DeliveryRecord&);
@@ -196,11 +198,11 @@ class SemanticState : private boost::non
 
   public:
 
-    SemanticState(DeliveryAdapter&, SessionContext&);
+    SemanticState(SessionState&);
     ~SemanticState();
 
-    SessionContext& getSession() { return session; }
-    const SessionContext& getSession() const { return session; }
+    SessionContext& getSession();
+    const SessionContext& getSession() const;
 
     const ConsumerImpl::shared_ptr find(const std::string& destination) const;
     bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
@@ -239,12 +241,12 @@ class SemanticState : private boost::non
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
-    void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);
+    TxBuffer* getTxBuffer();
+    void requeue();
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
-    void handle(boost::intrusive_ptr<Message> msg);
+    void route(Message& msg, Deliverable& strategy);
 
     void completed(const framing::SequenceSet& commands);
     void accepted(const framing::SequenceSet& commands);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug 10 12:04:27 2012
@@ -264,7 +264,7 @@ QueueQueryResult SessionAdapter::QueueHa
                                 queue->isDurable(),
                                 queue->hasExclusiveOwner(),
                                 queue->isAutoDelete(),
-                                queue->getSettings(),
+                                queue->getEncodableSettings(),
                                 queue->getMessageCount(),
                                 queue->getConsumerCount());
     } else {
@@ -294,19 +294,24 @@ void SessionAdapter::QueueHandlerImpl::d
         queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
+        QueueSettings settings(durable, autoDelete);
+        try {
+            settings.populate(arguments, settings.storeSettings);
+        } catch (const qpid::types::Exception& e) {
+            throw InvalidArgumentException(e.what());
+        }
+
         std::pair<Queue::shared_ptr, bool> queue_created =
-            getBroker().createQueue(name, durable,
-                                    autoDelete,
+            getBroker().createQueue(name, settings,
                                     exclusive ? &session : 0,
                                     alternateExchange,
-                                    arguments,
                                     getConnection().getUserId(),
                                     getConnection().getUrl());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
             //handle automatic cleanup:
-            if (exclusive) {
+            if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
         } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Aug 10 12:04:27 2012
@@ -32,7 +32,7 @@ using namespace std;
 using namespace qpid::sys;
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
-    : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+    : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c),
       proxy(out),
       clusterOrderProxy(c.getClusterOrderOutput() ?
@@ -75,7 +75,7 @@ ConnectionState& SessionHandler::getConn
 const ConnectionState& SessionHandler::getConnection() const { return connection; }
 
 void SessionHandler::handleDetach() {
-    amqp_0_10::SessionHandler::handleDetach();
+    qpid::amqp_0_10::SessionHandler::handleDetach();
     assert(&connection.getChannel(channel.get()) == this);
     if (session.get())
         connection.getBroker().getSessionManager().detach(session);
@@ -125,7 +125,7 @@ void SessionHandler::attached(const std:
 {
     if (session.get()) {
         session->addManagementObject(); // Delayed from attachAs()
-        amqp_0_10::SessionHandler::attached(name);
+        qpid::amqp_0_10::SessionHandler::attached(name);
     } else {
         SessionId id(connection.getUserId(), name);
         SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Aug 10 12:04:27 2012
@@ -41,7 +41,7 @@ class SessionState;
  * receives incoming frames, handles session controls and manages the
  * association between the channel and a session.
  */
-class SessionHandler : public amqp_0_10::SessionHandler {
+class SessionHandler : public qpid::amqp_0_10::SessionHandler {
   public:
     class ErrorListener {
       public:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org