You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [3/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h Fri Aug 10 12:04:27 2012
@@ -29,7 +29,8 @@ namespace framing {
 class SequenceNumber;
 }
 namespace broker {
-struct QueuedMessage;
+class Message;
+class QueueCursor;
 
 /**
  * This interface abstracts out the access to the messages held for
@@ -39,8 +40,7 @@ struct QueuedMessage;
 class Messages
 {
   public:
-    typedef boost::function1<void, QueuedMessage&> Functor;
-    typedef boost::function1<bool, QueuedMessage&> Predicate;
+    typedef boost::function1<void, Message&> Functor;
 
     virtual ~Messages() {}
     /**
@@ -51,47 +51,44 @@ class Messages
     /**
      * Called when a message is deleted from the queue.
      */
-    virtual bool deleted(const QueuedMessage&) = 0;
+    virtual bool deleted(const QueueCursor&) = 0;
     /**
-     * Releases an acquired message, making it available again.
+     * Makes a message available.
      */
-    virtual void release(const QueuedMessage&) = 0;
+    virtual void publish(const Message& added) = 0;
     /**
-     * Acquire the message at the specified position, returning true
-     * if found, false otherwise. The acquired message is passed back
-     * via the second parameter.
-     */
-    virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0;
-    /**
-     * Find the message at the specified position, returning true if
-     * found, false otherwise. The matched message is passed back via
-     * the second parameter.
+     * Retrieve the next message for the given cursor. A reference to
+     * the message is passed back via the second parameter.
+     *
+     * @return a pointer to the message if there is one, in which case
+     * the cursor that points to it is assigned to cursor; null
+     * otherwise.
      */
-    virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+    virtual Message* next(QueueCursor& cursor) = 0;
+
     /**
-     * Retrieve the next message to be given to a browsing
-     * subscription that has reached the specified position. The next
-     * message is passed back via the second parameter.
+     * Release the message i.e. return it to the available state
+     * unless it has already been deleted.
      *
-     * @param unacquired, if true, will only browse unacquired messages
-     *
-     * @return true if there is another message, false otherwise.
+     * @return a pointer to the Message if it is still in acquired state and
+     * hence can be released; null if it has already been deleted
      */
-    virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0;
+    virtual Message* release(const QueueCursor& cursor) = 0;
     /**
-     * Retrieve the next message available for a consuming
-     * subscription.
-     *
-     * @return true if there is such a message, false otherwise.
+     * Find the message with the specified sequence number, returning
+     * a pointer if found, null otherwise. A cursor to the matched
+     * message can be passed back via the second parameter, regardless
+     * of whether the message is found, using this cursor to call
+     * next() will give the next message greater than position if one
+     * exists.
      */
-    virtual bool consume(QueuedMessage&) = 0;
+    virtual Message* find(const framing::SequenceNumber&, QueueCursor*) = 0;
+
     /**
-     * Pushes a message to the back of the 'queue'. For some types of
-     * queue this may cause another message to be removed; if that is
-     * the case the method will return true and the removed message
-     * will be passed out via the second parameter.
+     * Find the message at the specified position, returning a pointer if
+     * found, null otherwise.
      */
-    virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+    virtual Message* find(const QueueCursor&) = 0;
 
     /**
      * Add an already acquired message to the queue.
@@ -99,25 +96,11 @@ class Messages
      * Only need be implemented by subclasses that keep track of
      * acquired messages.
      */
-    virtual void updateAcquired(const QueuedMessage&) { }
-
-    /**
-     * Set the position of the back of the queue. Next message enqueued will be n+1.
-     *@pre Any messages with seq > n must already be dequeued.
-     */
-    virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
-
+    //virtual void updateAcquired(const QueuedMessage&) { }
     /**
      * Apply, the functor to each message held
      */
-
     virtual void foreach(Functor) = 0;
-    /**
-     * Remove every message held that for which the specified
-     * predicate returns true
-     */
-    virtual void removeIf(Predicate) = 0;
-
   private:
 };
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h Fri Aug 10 12:04:27 2012
@@ -32,7 +32,7 @@ namespace broker {
 /**
  * Base class for all persistable objects
  */
-class Persistable : public RefCounted
+class Persistable : public virtual RefCounted
 {
 public:
     /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri Aug 10 12:04:27 2012
@@ -29,132 +29,23 @@ using namespace qpid::broker;
 namespace qpid {
 namespace broker {
 
-class MessageStore;
-
 PersistableMessage::~PersistableMessage() {}
-
-PersistableMessage::PersistableMessage() :
-    asyncDequeueCounter(0),
-    store(0)
-{}
+PersistableMessage::PersistableMessage() : persistenceId(0) {}
 
 void PersistableMessage::flush()
 {
-    syncList copy;
-    {
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-	if (store) {
-	    copy = synclist;
-	} else {
-            return;//early exit as nothing to do
-	}
-    }
-    for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
-        PersistableQueue::shared_ptr q(i->lock());
-        if (q) {
-            q->flush();
-        }
-    } 
-}
-
-void PersistableMessage::setContentReleased()
-{
-    contentReleaseState.released = true;
-}
-
-bool PersistableMessage::isContentReleased() const
-{ 
-    return contentReleaseState.released;
-}
-       
-
-bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
-    if (store && (queue->getPersistenceId()!=0)) {
-        for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-            PersistableQueue::shared_ptr q(i->lock());
-            if (q && q->getPersistenceId() == queue->getPersistenceId())  return true;
-        } 
-    }            
-    return false;
+    //TODO: is this really the right place for this?
 }
 
 
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        store = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
-}
-
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
-    addToSyncList(queue, _store);
-    enqueueStart();
-}
-
-bool PersistableMessage::isDequeueComplete() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    return asyncDequeueCounter == 0;
-}
-    
-void PersistableMessage::dequeueComplete() { 
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-        if (asyncDequeueCounter > 0) {
-            if (--asyncDequeueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) allDequeuesComplete();
-}
-
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        store = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
-    dequeueAsync();
-}
-
-void PersistableMessage::dequeueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    asyncDequeueCounter++; 
-}
-
-PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
-
-void PersistableMessage::setStore(MessageStore* s)
-{
-    store = s;
-}
-
-void PersistableMessage::requestContentRelease()
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
 {
-    contentReleaseState.requested = true;
-}
-void PersistableMessage::blockContentRelease()
-{ 
-    contentReleaseState.blocked = true;
-}
-bool PersistableMessage::checkContentReleasable()
-{ 
-    return contentReleaseState.requested && !contentReleaseState.blocked;
-}
-
-bool PersistableMessage::isContentReleaseBlocked()
-{
-    return contentReleaseState.blocked;
+    enqueueStart();
 }
 
-bool PersistableMessage::isContentReleaseRequested()
-{
-    return contentReleaseState.requested;
-}
+bool PersistableMessage::isDequeueComplete() { return false; }
+void PersistableMessage::dequeueComplete() {}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
 
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Aug 10 12:04:27 2012
@@ -24,29 +24,30 @@
 
 #include <string>
 #include <list>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <map>
+#include <boost/intrusive_ptr.hpp>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Persistable.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/amqp_framing.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
 #include "qpid/broker/AsyncCompletion.h"
 
 namespace qpid {
+namespace types {
+class Variant;
+}
 namespace broker {
 
 class MessageStore;
+class Queue;
 
 /**
  * Base class for persistable messages.
  */
 class PersistableMessage : public Persistable
 {
-    typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncDequeueLock;
-    sys::Mutex storeLock;
-
     /**
      * "Ingress" messages == messages sent _to_ the broker.
      * Tracks the number of outstanding asynchronous operations that must
@@ -56,85 +57,44 @@ class PersistableMessage : public Persis
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    AsyncCompletion ingressCompletion;
-
-    /**
-     * Tracks the number of outstanding asynchronous dequeue
-     * operations. When the message is dequeued asynchronously the
-     * count is incremented; when that dequeue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * dequeues.
-     */
-    int asyncDequeueCounter;
-
-    void dequeueAsync();
-
-    syncList synclist;
-    struct ContentReleaseState
-    {
-        bool blocked;
-        bool requested;
-        bool released;
-        
-        ContentReleaseState();
-    };
-    ContentReleaseState contentReleaseState;
-
-  protected:
-    /** Called when all dequeues are complete for this message. */
-    virtual void allDequeuesComplete() = 0;
-
-    void setContentReleased();
-
-    MessageStore* store;
-
+    boost::intrusive_ptr<AsyncCompletion> ingressCompletion;
+    mutable uint64_t persistenceId;
 
   public:
-    typedef boost::shared_ptr<PersistableMessage> shared_ptr;
-
-    /**
-     * @returns the size of the headers when encoded
-     */
-    virtual uint32_t encodedHeaderSize() const = 0;
-
     virtual ~PersistableMessage();
-
     PersistableMessage();
 
     void flush();
-    
-    QPID_BROKER_EXTERN bool isContentReleased() const;
 
     QPID_BROKER_EXTERN void setStore(MessageStore*);
-    void requestContentRelease();
-    void blockContentRelease();
-    bool checkContentReleasable();
-    bool isContentReleaseBlocked();
-    bool isContentReleaseRequested();
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
     /** track the progress of a message received by the broker - see ingressCompletion above */
-    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
-    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
+    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
+    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
+    QPID_BROKER_INLINE_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { ingressCompletion = i; }
 
-    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
-    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
+    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
+    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
 
 
     QPID_BROKER_EXTERN bool isDequeueComplete();
-    
     QPID_BROKER_EXTERN void dequeueComplete();
-
     QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
 
-    bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
-    
-    void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
+    uint64_t getPersistenceId() const { return persistenceId; }
+    void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+
+    virtual void decodeHeader(framing::Buffer& buffer) = 0;
+    virtual void decodeContent(framing::Buffer& buffer) = 0;
+    virtual uint32_t encodedHeaderSize() const = 0;
+    virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Fri Aug 10 12:04:27 2012
@@ -19,24 +19,53 @@
  *
  */
 #include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
+#include <algorithm>
 #include <cmath>
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace broker {
+namespace {
+class PriorityContext : public CursorContext {
+  public:
+    std::vector<QueueCursor> position;
+    PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
+};
+}
+
 
 PriorityQueue::PriorityQueue(int l) :
     levels(l),
-    messages(levels, Deque()),
-    frontLevel(0), haveFront(false), cached(false) {}
+    messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
+    counters(levels, framing::SequenceNumber()),
+    fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
+    frontLevel(0), haveFront(false), cached(false)
+{
+}
 
-bool PriorityQueue::deleted(const QueuedMessage& qm) {
-    bool deleted = fifo.deleted(qm);
-    if (deleted) erase(qm);
-    return deleted;
+bool PriorityQueue::deleted(const QueueCursor& c)
+{
+    MessagePointer* ptr = fifo.find(c);
+    if (ptr && ptr->holder) {
+        //mark the message as deleted
+        ptr->holder->message.setState(DELETED);
+        //clean the deque for the relevant priority level
+        boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
+        messages[ptr->holder->priority].clean();
+        //stop referencing that message holder (it may now have been
+        //deleted)
+        ptr->holder = 0;
+        //clean fifo index
+        fifo.clean();
+        return true;
+    } else {
+        return false;
+    }
 }
 
 size_t PriorityQueue::size()
@@ -44,85 +73,69 @@ size_t PriorityQueue::size()
     return fifo.size();
 }
 
-namespace {
-bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
-}
-
-void PriorityQueue::release(const QueuedMessage& message)
+Message* PriorityQueue::next(QueueCursor& cursor)
 {
-    QueuedMessage* qm = fifo.releasePtr(message);
-    if (qm) {
-        uint p = getPriorityLevel(message);
-        messages[p].insert(
-            lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
-        clearCache();
-    }
-}
-
-
-void PriorityQueue::erase(const QueuedMessage& qm) {
-    size_t i = getPriorityLevel(qm);
-    if (!messages[i].empty()) {
-        long diff = qm.position.getValue() - messages[i].front()->position.getValue();
-        if (diff < 0) return;
-        long maxEnd = std::min(size_t(diff), messages[i].size());
-        QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
-        Deque::iterator l =
-            lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
-        if (l != messages[i].end() && (*l)->position == qm.position) {
-            messages[i].erase(l);
-            clearCache();
-            return;
+    boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
+    if (!ctxt) {
+        ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+        cursor.context = ctxt;
+    }
+    if (cursor.type == REPLICATOR) {
+        //browse in fifo order
+        MessagePointer* ptr = fifo.next(cursor);
+        return ptr ? &(ptr->holder->message) : 0;
+    } else if (cursor.type == PURGE) {
+        //iterate over message in reverse priority order (i.e. purge lowest priority message first)
+        //ignore any fairshare configuration here as well
+        for (int p = 0; p < levels; ++p) {
+            MessageHolder* holder = messages[p].next(ctxt->position[p]);
+            if (holder) {
+                cursor.setPosition(holder->message.getSequence(), 0);
+                return &(holder->message);
+            }
         }
+        return 0;
+    } else {
+        //check each level in turn, in priority order, for any more messages
+        Priority p = firstLevel();
+        do {
+            MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
+            if (holder) {
+                cursor.setPosition(holder->message.getSequence(), 0);
+                return &(holder->message);
+            }
+        } while (nextLevel(p));
+        return 0;
     }
 }
 
-bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const QueueCursor& cursor)
 {
-    bool acquired = fifo.acquire(position, message);
-    if (acquired) erase(message); // No longer available
-    return acquired;
+    return find(cursor.position, 0);
 }
 
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
 {
-    return fifo.find(position, message);
+    MessagePointer* ptr = fifo.find(position, cursor);
+    return ptr ? &(ptr->holder->message) : 0;
 }
 
-bool PriorityQueue::browse(
-    const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+void PriorityQueue::publish(const Message& published)
 {
-    return fifo.browse(position, message, unacquired);
+    MessageHolder holder;
+    holder.message = published;
+    holder.priority = getPriorityLevel(published);
+    holder.id = ++(counters[holder.priority]);
+    MessagePointer pointer;
+    pointer.holder = &(messages[holder.priority].publish(holder));
+    pointer.id = published.getSequence();
+    fifo.publish(pointer);
 }
 
-bool PriorityQueue::consume(QueuedMessage& message)
+Message* PriorityQueue::release(const QueueCursor& cursor)
 {
-    if (checkFront()) {
-        QueuedMessage* pm = messages[frontLevel].front();
-        messages[frontLevel].pop_front();
-        clearCache();
-        pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
-        message = *pm;
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
-    QueuedMessage* qmp = fifo.pushPtr(added);
-    messages[getPriorityLevel(added)].push_back(qmp);
-    clearCache();
-    return false; // Adding a message never causes one to be removed for deque
-}
-
-void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
-    fifo.updateAcquired(acquired);
-}
-
-void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
-    fifo.setPosition(n);
+    MessagePointer* ptr = fifo.release(cursor);
+    return ptr ? &(ptr->holder->message) : 0;
 }
 
 void PriorityQueue::foreach(Functor f)
@@ -130,62 +143,87 @@ void PriorityQueue::foreach(Functor f)
     fifo.foreach(f);
 }
 
-void PriorityQueue::removeIf(Predicate p)
-{
-    for (int priority = 0; priority < levels; ++priority) {
-        for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
-            if (p(**i)) {
-                (*i)->status = QueuedMessage::DELETED; // Updates fifo index
-                i = messages[priority].erase(i);
-                clearCache();
-            } else {
-                ++i;
-            }
-        }
-    }
-    fifo.clean();
-}
-
-uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+uint PriorityQueue::getPriorityLevel(const Message& m) const
 {
-    uint priority = m.payload->getPriority();
+    uint priority = m.getPriority();
     //Use AMQP 0-10 approach to mapping priorities to a fixed level
     //(see rule priority-level-implementation)
     const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
     if (priority <= firstLevel) return 0;
     return std::min(priority - firstLevel, (uint)levels-1);
 }
+PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
+{
+    PriorityQueue::MessagePointer pointer;
+    pointer.holder = 0;
+    pointer.id = id;
+    return pointer;
+}
 
-void PriorityQueue::clearCache()
+PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
 {
-    cached = false;
+    PriorityQueue::MessageHolder holder;
+    holder.id = id;
+    holder.message.setState(DELETED);
+    return holder;
 }
 
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+PriorityQueue::Priority PriorityQueue::firstLevel()
 {
-    for (int p = levels-1; p >= 0; --p) {
-        if (!m[p].empty()) {
-            l = p;
-            return true;
-        }
+    return Priority(levels - 1);
+}
+bool PriorityQueue::nextLevel(Priority& p)
+{
+    if (p.current > 0) {
+        --(p.current);
+        return true;
+    } else {
+        return false;
     }
-    return false;
 }
 
-bool PriorityQueue::checkFront()
+framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
+{
+    return id;
+}
+void PriorityQueue::MessageHolder::setState(MessageState s)
 {
-    if (!cached) {
-        haveFront = findFrontLevel(frontLevel, messages);
-        cached = true;
+    message.setState(s);
+}
+MessageState PriorityQueue::MessageHolder::getState() const
+{
+    return message.getState();
+}
+PriorityQueue::MessageHolder::operator Message&()
+{
+    return message;
+}
+framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
+{
+    if (holder) {
+        return holder->message.getSequence();
+    } else {
+        //this is used when the instance is merely acting as padding
+        return id;
     }
-    return haveFront;
 }
-
-uint PriorityQueue::getPriority(const QueuedMessage& message)
+void PriorityQueue::MessagePointer::setState(MessageState s)
 {
-    const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
-    if (queue) return queue->getPriorityLevel(message);
-    else return 0;
+    if (holder) {
+        holder->message.setState(s);
+    }
+}
+MessageState PriorityQueue::MessagePointer::getState() const
+{
+    if (holder) {
+        return holder->message.getState();
+    } else {
+        return DELETED;
+    }
+}
+PriorityQueue::MessagePointer::operator Message&()
+{
+    assert(holder);
+    return holder->message;
 }
-
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Fri Aug 10 12:04:27 2012
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/IndexedDeque.h"
 #include "qpid/sys/IntegerTypes.h"
 #include <deque>
 #include <vector>
@@ -44,42 +45,63 @@ class PriorityQueue : public Messages
     virtual ~PriorityQueue() {}
     size_t size();
 
-    bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void updateAcquired(const QueuedMessage& acquired);
-    void setPosition(const framing::SequenceNumber&);
-    void foreach(Functor);
-    void removeIf(Predicate);
-
-    static uint getPriority(const QueuedMessage&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
 
+    void foreach(Functor);
+    static uint getPriority(const Message&);
   protected:
-    typedef std::deque<QueuedMessage*> Deque;
-    typedef std::vector<Deque> PriorityLevels;
-    virtual bool findFrontLevel(uint& p, PriorityLevels&);
-
     const int levels;
+    struct Priority
+    {
+        const int start;
+        int current;
+        Priority(int s) : start(s), current(start) {}
+    };
+    virtual Priority firstLevel();
+    virtual bool nextLevel(Priority& );
 
   private:
-    /** Available messages separated by priority and sorted in priority order.
-     *  Holds pointers to the QueuedMessages in fifo
+    struct MessageHolder
+    {
+        Message message;
+        int priority;
+        framing::SequenceNumber id;
+        framing::SequenceNumber getSequence() const;
+        void setState(MessageState);
+        MessageState getState() const;
+        operator Message&();
+    };
+    struct MessagePointer
+    {
+        MessageHolder* holder;
+        framing::SequenceNumber id;//used only for padding
+        framing::SequenceNumber getSequence() const;
+        void setState(MessageState);
+        MessageState getState() const;
+        operator Message&();
+    };
+    typedef IndexedDeque<MessageHolder> Deque;
+    typedef std::vector<Deque> PriorityLevels;
+    typedef std::vector<framing::SequenceNumber> Counters;
+
+    /** Holds pointers to messages (stored in the fifo index) separated by priority.
      */
     PriorityLevels messages;
-    /** FIFO index of all messsagse (including acquired messages)  for fast browsing and indexing */
-    MessageDeque fifo;
+    Counters counters;
+    /** FIFO index of messages for fast browsing and indexing */
+    IndexedDeque<MessagePointer> fifo;
     uint frontLevel;
     bool haveFront;
     bool cached;
 
-    void erase(const QueuedMessage&);
-    uint getPriorityLevel(const QueuedMessage&) const;
-    void clearCache();
-    bool checkFront();
+    uint getPriorityLevel(const Message&) const;
+    MessageHolder priorityPadding(qpid::framing::SequenceNumber);
+    MessagePointer fifoPadding(qpid::framing::SequenceNumber);
 };
 
 }} // namespace qpid::broker



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org