You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/09/21 22:39:41 UTC

svn commit: r697603 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ tests/

Author: gsim
Date: Sun Sep 21 13:39:40 2008
New Revision: 697603

URL: http://svn.apache.org/viewvc?rev=697603&view=rev
Log:
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp
- moved QueuedMessage definition into its own header file
- added checks for requeue and dequeue
- split QueuePolicy logic into different sub classes
Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sun Sep 21 13:39:40 2008
@@ -464,6 +464,7 @@
   qpid/broker/PersistableQueue.h \
   qpid/broker/Prefetch.h \
   qpid/broker/QueueBindings.h \
+  qpid/broker/QueuedMessage.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.h \
   qpid/broker/RecoverableConfig.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Sun Sep 21 13:39:40 2008
@@ -21,47 +21,33 @@
 #ifndef _Consumer_
 #define _Consumer_
 
-namespace qpid {
-    namespace broker {
-		class Queue;
-}}
-
 #include "Message.h"
+#include "QueuedMessage.h"
 #include "OwnershipToken.h"
 
 namespace qpid {
-    namespace broker {
+namespace broker {
 
-        struct QueuedMessage
-        {
-            boost::intrusive_ptr<Message> payload;
-            framing::SequenceNumber position;
-            Queue* queue;
-			
-            QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
-			               payload(msg), position(sn), queue(q) {}
-            QueuedMessage(Queue* q) : queue(q) {}
-        };
-        
-
-        class Consumer {
-            const bool acquires;
-        public:
-            typedef boost::shared_ptr<Consumer> shared_ptr;            
-
-            framing::SequenceNumber position;
-
-            Consumer(bool preAcquires = true) : acquires(preAcquires) {}
-            bool preAcquires() const { return acquires; }
-            virtual bool deliver(QueuedMessage& msg) = 0;
-            virtual void notify() = 0;
-            virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
-            virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
-            virtual OwnershipToken* getSession() = 0;
-            virtual ~Consumer(){}
-        };
-    }
-}
+class Queue;
+
+class Consumer {
+    const bool acquires;
+  public:
+    typedef boost::shared_ptr<Consumer> shared_ptr;            
+    
+    framing::SequenceNumber position;
+    
+    Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+    bool preAcquires() const { return acquires; }
+    virtual bool deliver(QueuedMessage& msg) = 0;
+    virtual void notify() = 0;
+    virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
+    virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+    virtual OwnershipToken* getSession() = 0;
+    virtual ~Consumer(){}
+};
+
+}}
 
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Sun Sep 21 13:39:40 2008
@@ -130,7 +130,7 @@
 
 void DeliveryRecord::accept(TransactionContext* ctxt) {
     if (acquired && !ended) {
-        queue->dequeue(ctxt, msg.payload);
+        queue->dequeue(ctxt, msg);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
     }
@@ -138,7 +138,7 @@
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
     if (acquired && !ended) {
-        queue->dequeue(ctxt, msg.payload);
+        queue->dequeue(ctxt, msg);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Sun Sep 21 13:39:40 2008
@@ -186,6 +186,8 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
+    if (policy.get() && !policy->isEnqueued(msg)) return;
+
     Listeners copy;
     {    
         Mutex::ScopedLock locker(messageLock);
@@ -415,29 +417,10 @@
     Listeners copy;
     {
         Mutex::ScopedLock locker(messageLock);   
-        messages.push_back(QueuedMessage(this, msg, ++sequence));
-        if (policy.get()) {
-            policy->enqueued(msg->contentSize());
-            if (policy->limitExceeded()) {
-                if (!policyExceeded) {
-                    policyExceeded = true;
-                    QPID_LOG(info, "Queue size exceeded policy for " << name);
-                }
-                if (store) {
-                    QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
-                    msg->releaseContent(store);
-                } else {
-                    QPID_LOG(error, "Message " << msg << " on " << name
-                             << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
-                    throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
-                }
-            } else {
-                if (policyExceeded) {
-                    policyExceeded = false;
-                    QPID_LOG(info, "Queue size within policy for " << name);
-                }
-            }
-        }
+        QueuedMessage qm(this, msg, ++sequence);
+        if (policy.get()) policy->tryEnqueue(qm);
+
+        messages.push_back(qm);
         listeners.swap(copy);
     }
     for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
@@ -486,15 +469,16 @@
 }
 
 // return true if store exists, 
-bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
+    if (policy.get() && !policy->isEnqueued(msg)) return false;
     {
         Mutex::ScopedLock locker(messageLock);
         dequeued(msg);
     }
-    if (msg->isPersistent() && store) {
-        msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-        boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+    if (msg.payload->isPersistent() && store) {
+        msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+        boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
         return true;
     }
@@ -508,7 +492,7 @@
  */
 void Queue::popAndDequeue()
 {
-    boost::intrusive_ptr<Message> msg = messages.front().payload;
+    QueuedMessage msg = messages.front();
     messages.pop_front();
     dequeue(0, msg);
 }
@@ -517,15 +501,15 @@
  * Updates policy and management when a message has been dequeued,
  * expects messageLock to be held
  */
-void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+void Queue::dequeued(const QueuedMessage& msg)
 {
-    if (policy.get()) policy->dequeued(msg->contentSize());
+    if (policy.get()) policy->dequeued(msg);
     if (mgmtObject != 0){
         mgmtObject->inc_msgTotalDequeues  ();
-        mgmtObject->inc_byteTotalDequeues (msg->contentSize());
-        if (msg->isPersistent ()){
+        mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+        if (msg.payload->isPersistent ()){
             mgmtObject->inc_msgPersistDequeues ();
-            mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+            mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
         }
     }
 }
@@ -551,10 +535,7 @@
 
 void Queue::configure(const FieldTable& _settings)
 {
-    std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
-    if (_policy->getMaxCount() || _policy->getMaxSize()) {
-        setPolicy(_policy);
-    }
+    setPolicy(QueuePolicy::createQueuePolicy(_settings));
     //set this regardless of owner to allow use of no-local with exclusive consumers also
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
@@ -720,6 +701,19 @@
     }
 }
 
+bool Queue::releaseMessageContent(const QueuedMessage& m)
+{
+    if (store) {
+        QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
+        m.payload->releaseContent(store);
+        return true;
+    } else {
+        QPID_LOG(warning, "Message " << m.position << " on " << name
+                 << " cannot be released from memory as the queue is not durable");
+        return false;
+    }    
+}
+
 ManagementObject* Queue::GetManagementObject (void) const
 {
     return (ManagementObject*) mgmtObject;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Sun Sep 21 13:39:40 2008
@@ -101,7 +101,7 @@
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-            void dequeued(boost::intrusive_ptr<Message>& msg);
+            void dequeued(const QueuedMessage& msg);
             void popAndDequeue();
 
         public:
@@ -180,7 +180,7 @@
             /**
              * dequeue from store (only done once messages is acknowledged)
              */
-            bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+            bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
 
             /**
              * Gets the next available message 
@@ -219,6 +219,8 @@
             template <class F> void eachBinding(const F& f) {
                 bindings.eachBinding(f);
             }
+
+            bool releaseMessageContent(const QueuedMessage&);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Sun Sep 21 13:39:40 2008
@@ -19,39 +19,78 @@
  *
  */
 #include "QueuePolicy.h"
+#include "Queue.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) : 
-    maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
-
-QueuePolicy::QueuePolicy(const FieldTable& settings) :
-    maxCount(getInt(settings, maxCountKey, 0)), 
-    maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {}
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
+    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
 
 void QueuePolicy::enqueued(uint64_t _size)
 {
-    if (maxCount) count++;
+    if (maxCount) ++count;
     if (maxSize) size += _size;
 }
 
 void QueuePolicy::dequeued(uint64_t _size)
 {
-    if (maxCount) count--;
+    if (maxCount) --count;
     if (maxSize) size -= _size;
 }
 
-bool QueuePolicy::limitExceeded()
+bool QueuePolicy::checkLimit(const QueuedMessage& m)
+{
+    bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount);
+    if (exceeded) {
+        if (!policyExceeded) {
+            policyExceeded = true;
+            QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+        }
+    } else {
+        if (policyExceeded) {
+            policyExceeded = false;
+            QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+        }
+    }
+    return !exceeded;
+}
+
+void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+{
+    if (checkLimit(m)) {
+        enqueued(m);
+    } else {
+        std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
+        throw ResourceLimitExceededException(
+            QPID_MSG("Policy exceeded on " << queue << " by message " << m.position 
+                     << " of size " << m.payload->contentSize() << " , policy: " << *this));
+    }
+}
+
+void QueuePolicy::enqueued(const QueuedMessage& m)
+{
+    enqueued(m.payload->contentSize());
+}
+
+void QueuePolicy::dequeued(const QueuedMessage& m)
+{
+    dequeued(m.payload->contentSize());
+}
+
+bool QueuePolicy::isEnqueued(const QueuedMessage&)
 {
-    return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+    return true;
 }
 
 void QueuePolicy::update(FieldTable& settings)
 {
     if (maxCount) settings.setInt(maxCountKey, maxCount);
-    if (maxSize) settings.setInt(maxSizeKey, maxSize);    
+    if (maxSize) settings.setInt(maxSizeKey, maxSize);
+    settings.setString(typeKey, type);
 }
 
 
@@ -62,6 +101,17 @@
     else return defaultValue;
 }
 
+std::string QueuePolicy::getType(const FieldTable& settings)
+{
+    FieldTable::ValuePtr v = settings.get(typeKey);
+    if (v && v->convertsTo<std::string>()) {
+        std::string t = v->get<std::string>();
+        transform(t.begin(), t.end(), t.begin(), tolower);        
+        if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
+    }
+    return REJECT;
+}
+
 void QueuePolicy::setDefaultMaxSize(uint64_t s)
 {
     defaultMaxSize = s;
@@ -69,20 +119,123 @@
 
 const std::string QueuePolicy::maxCountKey("qpid.max_count");
 const std::string QueuePolicy::maxSizeKey("qpid.max_size");
+const std::string QueuePolicy::typeKey("qpid.policy_type");
+const std::string QueuePolicy::REJECT("reject");
+const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
+const std::string QueuePolicy::RING("ring");
+const std::string QueuePolicy::RING_STRICT("ring_strict");
 uint64_t QueuePolicy::defaultMaxSize(0);
 
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : 
+    QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+
+bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+{
+    return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+}
+
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
+    QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+
+void RingQueuePolicy::enqueued(const QueuedMessage& m)
+{
+    QueuePolicy::enqueued(m);
+    qpid::sys::Mutex::ScopedLock l(lock);
+    queue.push_back(m);
+}
+
+void RingQueuePolicy::dequeued(const QueuedMessage& m)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    QueuePolicy::dequeued(m);
+    //find and remove m from queue
+    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+        if (i->position == m.position) {
+            queue.erase(i);
+            break;
+        }
+    }
+}
+
+bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    //for non-strict ring policy, a message can be dequeued before acked; need to detect this
+    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+        if (i->position == m.position) {
+            return true;
+        }
+    }
+    return false;
+}
+
+bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+{
+    if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
+    
+    QueuedMessage oldest;
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        oldest = queue.front();
+    }
+    if (oldest.queue->acquire(oldest) || !strict) {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        if (oldest.position == queue.front().position) {
+            queue.pop_front();
+            QPID_LOG(debug, "Ring policy triggered in queue " 
+                     << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+                     << ": removed message " << oldest.position << " to make way for " << m.position);
+        }
+        return true;
+    } else {
+        QPID_LOG(debug, "Ring policy could not be triggered in queue " 
+                 << (m.queue ? m.queue->getName() : std::string("unknown queue")) 
+                 << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+        //in strict mode, if oldest message has been delivered (hence
+        //cannot be acquired) but not yet acked, it should not be
+        //removed and the attempted enqueue should fail
+        return false;
+    }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
+{
+    uint32_t maxCount = getInt(settings, maxCountKey, 0);
+    uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
+    if (maxCount || maxSize) {
+        return createQueuePolicy(maxCount, maxSize, getType(settings));
+    } else {
+        return std::auto_ptr<QueuePolicy>();
+    }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+    if (type == RING || type == RING_STRICT) {
+        return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+    } else if (type == FLOW_TO_DISK) {
+        return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+    } else {
+        return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+    }
+
+}
+
+
 namespace qpid {
     namespace broker {
 
 std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
 {
-    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
-    else out << "size unlimited, current=" << p.size;
+    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+    else out << "size: unlimited";
     out << "; ";
-    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
-    else out << "count unlimited, current=" << p.count;    
+    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+    else out << "count: unlimited";    
+    out << "; type=" << p.type;
     return out;
 }
 
     }
 }
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Sun Sep 21 13:39:40 2008
@@ -21,40 +21,85 @@
 #ifndef _QueuePolicy_
 #define _QueuePolicy_
 
+#include <deque>
 #include <iostream>
+#include <memory>
+#include "QueuedMessage.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
 
 namespace qpid {
-    namespace broker {
-        class QueuePolicy
-        {
-            static const std::string maxCountKey;
-            static const std::string maxSizeKey;
-            
-            static uint64_t defaultMaxSize;
+namespace broker {
 
-            const uint32_t maxCount;
-            const uint64_t maxSize;
-            uint32_t count;
-            uint64_t size;
+class QueuePolicy
+{
+    static uint64_t defaultMaxSize;
+
+    const uint32_t maxCount;
+    const uint64_t maxSize;
+    const std::string type;
+    qpid::sys::AtomicValue<uint32_t> count;
+    qpid::sys::AtomicValue<uint64_t> size;
+    bool policyExceeded;
             
-            static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+    static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+    static std::string getType(const qpid::framing::FieldTable& settings);
+
+  public:
+    static const std::string maxCountKey;
+    static const std::string maxSizeKey;
+    static const std::string typeKey;
+    static const std::string REJECT;
+    static const std::string FLOW_TO_DISK;
+    static const std::string RING;
+    static const std::string RING_STRICT;            
+
+    virtual ~QueuePolicy() {}
+    void tryEnqueue(const QueuedMessage&);
+    virtual void dequeued(const QueuedMessage&);
+    virtual bool isEnqueued(const QueuedMessage&);
+    virtual bool checkLimit(const QueuedMessage&);
+    void update(qpid::framing::FieldTable& settings);
+    uint32_t getMaxCount() const { return maxCount; }
+    uint64_t getMaxSize() const { return maxSize; }           
+
+    static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
+    static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+    static void setDefaultMaxSize(uint64_t);
+    friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
+  protected:
+    QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+    virtual void enqueued(const QueuedMessage&);
+    void enqueued(uint64_t size);
+    void dequeued(uint64_t size);
+};
+
+
+class FlowToDiskPolicy : public QueuePolicy
+{
+  public:
+    FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
+    bool checkLimit(const QueuedMessage&);
+};
+
+class RingQueuePolicy : public QueuePolicy
+{
+  public:
+    RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+    void enqueued(const QueuedMessage&);
+    void dequeued(const QueuedMessage&);
+    bool isEnqueued(const QueuedMessage&);
+    bool checkLimit(const QueuedMessage&);
+  private:
+    typedef std::deque<QueuedMessage> Messages;
+    qpid::sys::Mutex lock;
+    Messages queue;
+    const bool strict;
+};
 
-        public:
-            QueuePolicy(uint32_t maxCount, uint64_t maxSize);
-            QueuePolicy(const qpid::framing::FieldTable& settings);
-            void enqueued(uint64_t size);
-            void dequeued(uint64_t size);
-            void update(qpid::framing::FieldTable& settings);
-            bool limitExceeded();
-            uint32_t getMaxCount() const { return maxCount; }
-            uint64_t getMaxSize() const { return maxSize; }           
-
-            static void setDefaultMaxSize(uint64_t);
-	    friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
-        };
-    }
-}
+}}
 
 
 #endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=697603&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Sun Sep 21 13:39:40 2008
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuedMessage_
+#define _QueuedMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+
+struct QueuedMessage
+{
+    boost::intrusive_ptr<Message> payload;
+    framing::SequenceNumber position;
+    Queue* queue;
+
+    QueuedMessage() : queue(0) {}
+    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
+        payload(msg), position(sn), queue(q) {}
+    QueuedMessage(Queue* q) : queue(q) {}
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Sun Sep 21 13:39:40 2008
@@ -277,7 +277,7 @@
         parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
     } 
     if (acquire && !ackExpected) {
-        queue->dequeue(0, msg.payload);
+        queue->dequeue(0, msg);
     }
     return true;
 }

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sun Sep 21 13:39:40 2008
@@ -22,3 +22,4 @@
 .valgrind.suppress
 .valgrindrc
 cluster_test
+echotest

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=697603&r1=697602&r2=697603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Sun Sep 21 13:39:40 2008
@@ -19,63 +19,142 @@
  *
  */
 #include "qpid/broker/QueuePolicy.h"
+#include "qpid/sys/Time.h"
 #include "unit_test.h"
+#include "MessageUtils.h"
+#include "BrokerFixture.h"
 
 using namespace qpid::broker;
+using namespace qpid::client;
 using namespace qpid::framing;
 
 QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
 
+QueuedMessage createMessage(uint32_t size)
+{
+    QueuedMessage msg;
+    msg.payload = MessageUtils::createMessage();
+    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+    return msg;
+}
+
+
 QPID_AUTO_TEST_CASE(testCount)
 {
-    QueuePolicy policy(5, 0);
-    BOOST_CHECK(!policy.limitExceeded());
-    for (int i = 0; i < 5; i++) policy.enqueued(10);
-    BOOST_CHECK_EQUAL((uint64_t) 0, policy.getMaxSize());
-    BOOST_CHECK_EQUAL((uint32_t) 5, policy.getMaxCount());
-    BOOST_CHECK(!policy.limitExceeded());
-    policy.enqueued(10);
-    BOOST_CHECK(policy.limitExceeded());
-    policy.dequeued(10);
-    BOOST_CHECK(!policy.limitExceeded());
-    policy.enqueued(10);
-    BOOST_CHECK(policy.limitExceeded());        
+    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0));
+    BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
+    BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
+
+    QueuedMessage msg = createMessage(10);
+    for (size_t i = 0; i < 5; i++) { 
+        policy->tryEnqueue(msg);
+    }
+    try {
+        policy->tryEnqueue(msg);        
+        BOOST_FAIL("Policy did not fail on enqueuing sixth message");
+    } catch (const ResourceLimitExceededException&) {}
+
+    policy->dequeued(msg);
+    policy->tryEnqueue(msg);
+
+    try {
+        policy->tryEnqueue(msg);        
+        BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
+    } catch (const ResourceLimitExceededException&) {}
 }
 
 QPID_AUTO_TEST_CASE(testSize)
 {
-    QueuePolicy policy(0, 50);
-    for (int i = 0; i < 5; i++) policy.enqueued(10);
-    BOOST_CHECK(!policy.limitExceeded());
-    policy.enqueued(10);
-    BOOST_CHECK(policy.limitExceeded());
-    policy.dequeued(10);
-    BOOST_CHECK(!policy.limitExceeded());
-    policy.enqueued(10);
-    BOOST_CHECK(policy.limitExceeded());        
+    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50));
+    QueuedMessage msg = createMessage(10);
+    
+    for (size_t i = 0; i < 5; i++) { 
+        policy->tryEnqueue(msg);
+    }
+    try {
+        policy->tryEnqueue(msg);        
+        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
+    } catch (const ResourceLimitExceededException&) {}
+
+    policy->dequeued(msg);
+    policy->tryEnqueue(msg);
+
+    try {
+        policy->tryEnqueue(msg);        
+        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
+    } catch (const ResourceLimitExceededException&) {}
 }
 
 QPID_AUTO_TEST_CASE(testBoth)
 {
-    QueuePolicy policy(5, 50);
-    for (int i = 0; i < 5; i++) policy.enqueued(11);
-    BOOST_CHECK(policy.limitExceeded());
-    policy.dequeued(20);
-    BOOST_CHECK(!policy.limitExceeded());//fails
-    policy.enqueued(5);
-    policy.enqueued(10);
-    BOOST_CHECK(policy.limitExceeded());
+    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50));
+    try {
+        QueuedMessage msg = createMessage(51);
+        policy->tryEnqueue(msg);
+        BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
+    } catch (const ResourceLimitExceededException&) {}
+
+    std::vector<QueuedMessage> messages;
+    messages.push_back(createMessage(15));
+    messages.push_back(createMessage(10));
+    messages.push_back(createMessage(11));
+    messages.push_back(createMessage(2));
+    messages.push_back(createMessage(7));
+    for (size_t i = 0; i < messages.size(); i++) { 
+        policy->tryEnqueue(messages[i]);
+    }
+    //size = 45 at this point, count = 5
+    try {
+        QueuedMessage msg = createMessage(5);
+        policy->tryEnqueue(msg);
+        BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
+    } catch (const ResourceLimitExceededException&) {}
+    try {
+        QueuedMessage msg = createMessage(10);
+        policy->tryEnqueue(msg);
+        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
+    } catch (const ResourceLimitExceededException&) {}
+
+
+    policy->dequeued(messages[0]);
+    try {
+        QueuedMessage msg = createMessage(20);
+        policy->tryEnqueue(msg);
+    } catch (const ResourceLimitExceededException&) {
+        BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
+    }
 }
 
 QPID_AUTO_TEST_CASE(testSettings)
 {
     //test reading and writing the policy from/to field table
+    std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303));
     FieldTable settings;
-    QueuePolicy a(101, 303);
-    a.update(settings);
-    QueuePolicy b(settings);
-    BOOST_CHECK_EQUAL(a.getMaxCount(), b.getMaxCount());
-    BOOST_CHECK_EQUAL(a.getMaxSize(), b.getMaxSize());
+    a->update(settings);
+    std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings));
+    BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
+    BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
 }
 
+QPID_AUTO_TEST_CASE(testRingPolicy) 
+{
+    FieldTable args;
+    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING);
+    policy->update(args);
+
+    ProxySessionFixture f;
+    std::string q("my-ring-queue");
+    f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+    for (int i = 0; i < 10; i++) {
+        f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+    }
+    client::Message msg;
+    for (int i = 5; i < 10; i++) {
+        BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
+    }
+    BOOST_CHECK(!f.subs.get(msg, q));
+}
+
+
 QPID_AUTO_TEST_SUITE_END()