You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC

svn commit: r1377715 [3/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/ cpp/src/qpid/asyncStore/ cpp/src/qpid...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h Mon Aug 27 15:40:33 2012
@@ -30,21 +30,22 @@
 
 namespace qpid {
     namespace broker {
-        class Message;
-        class MessageStore;
+        namespace amqp_0_10 {
+        class MessageTransfer;
+        }
 
         class QPID_BROKER_CLASS_EXTERN MessageBuilder : public framing::FrameHandler{
         public:
-            QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
+            QPID_BROKER_EXTERN MessageBuilder();
             QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);
-            boost::intrusive_ptr<Message> getMessage() { return message; }
+            boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
             QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
             void end();
         private:
             enum State {DORMANT, METHOD, HEADER, CONTENT};
             State state;
-            boost::intrusive_ptr<Message> message;
-            MessageStore* const store;
+            boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
+            std::string exchange;
 
             void checkType(uint8_t expected, uint8_t actual);
         };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp Mon Aug 27 15:40:33 2012
@@ -19,218 +19,71 @@
  *
  */
 #include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/log/Statement.h"
 #include "assert.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
+namespace {
+Message padding(qpid::framing::SequenceNumber id) {
+    Message m;
+    m.setState(DELETED);
+    m.setSequence(id);
+    return m;
+}
+}
 
-MessageDeque::MessageDeque() : available(0), head(0) {}
+using qpid::framing::SequenceNumber;
 
-size_t MessageDeque::index(const framing::SequenceNumber& position)
+MessageDeque::MessageDeque() : messages(&padding) {}
+
+
+bool MessageDeque::deleted(const QueueCursor& cursor)
 {
-    //assuming a monotonic sequence, with no messages removed except
-    //from the ends of the deque, we can use the position to determin
-    //an index into the deque
-    if (messages.empty() || position < messages.front().position) return 0;
-    return position - messages.front().position;
-}
-
-bool MessageDeque::deleted(const QueuedMessage& m)
-{
-    size_t i = index(m.position);
-    if (i < messages.size()) {
-        QueuedMessage *qm = &messages[i];
-        if (qm->status != QueuedMessage::DELETED) {
-            qm->status = QueuedMessage::DELETED;
-            qm->payload = 0; // message no longer needed
-            clean();
-            return true;
-        }
-    }
-    return false;
+    return messages.deleted(cursor);
 }
 
-size_t MessageDeque::size()
+void MessageDeque::publish(const Message& added)
 {
-    return available;
+    messages.publish(added);
 }
 
-QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
+Message* MessageDeque::release(const QueueCursor& cursor)
 {
-    size_t i = index(message.position);
-    if (i < messages.size()) {
-        QueuedMessage& m = messages[i];
-        if (m.status == QueuedMessage::ACQUIRED) {
-            if (head > i) head = i;
-            m.status = QueuedMessage::AVAILABLE;
-            ++available;
-            return &messages[i];
-        }
-    } else {
-        assert(0);
-        QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
-    }
-    return 0;
-}
-
-void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
-
-bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
-    if (position < messages.front().position) return false;
-    size_t i = index(position);
-    if (i < messages.size()) {
-        QueuedMessage& temp = messages[i];
-        if (temp.status == QueuedMessage::AVAILABLE) {
-            temp.status = QueuedMessage::ACQUIRED;
-            --available;
-            message = temp;
-            return true;
-        }
-    }
-    return false;
-}
-
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
-{
-    size_t i = index(position);
-    if (i < messages.size()) {
-        message = messages[i];
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
-{
-    //get first message that is greater than position
-    size_t i = index(position + 1);
-    while (i < messages.size()) {
-        QueuedMessage& m = messages[i++];
-        if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
-            message = m;
-            return true;
-        }
-    }
-    return false;
-}
-
-bool MessageDeque::consume(QueuedMessage& message)
-{
-    while (head < messages.size()) {
-        QueuedMessage& i = messages[head++];
-        if (i.status == QueuedMessage::AVAILABLE) {
-            i.status = QueuedMessage::ACQUIRED;
-            --available;
-            message = i;
-            return true;
-        }
-    }
-    return false;
+    return messages.release(cursor);
 }
 
-namespace {
-QueuedMessage padding(uint32_t pos) {
-    return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
+Message* MessageDeque::next(QueueCursor& cursor)
+{
+    return messages.next(cursor);
 }
-} // namespace
 
-QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
-    //add padding to prevent gaps in sequence, which break the index
-    //calculation (needed for queue replication)
-    while (messages.size() && (added.position - messages.back().position) > 1)
-        messages.push_back(padding(messages.back().position + 1));
-    messages.push_back(added);
-    messages.back().status = QueuedMessage::AVAILABLE;
-    if (head >= messages.size()) head = messages.size() - 1;
-    ++available;
-    clean();  // QPID-4046: let producer help clean the backlog of deleted messages
-    return &messages.back();
-}
-
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
-    pushPtr(added);
-    return false; // adding a message never causes one to be removed for deque
-}
-
-void MessageDeque::updateAcquired(const QueuedMessage& acquired)
-{
-    // Pad the front of the queue if necessary
-    while (messages.size() && (acquired.position < messages.front().position))
-        messages.push_front(padding(uint32_t(messages.front().position) - 1));
-    size_t i = index(acquired.position);
-    if (i < messages.size()) {  // Replace an existing padding message
-        assert(messages[i].status == QueuedMessage::DELETED);
-        messages[i] = acquired;
-        messages[i].status = QueuedMessage::ACQUIRED;
-    }
-    else {                      // Push to the back
-        // Pad the back of the queue if necessary
-        while (messages.size() && (acquired.position - messages.back().position) > 1)
-            messages.push_back(padding(messages.back().position + 1));
-        assert(!messages.size() || (acquired.position - messages.back().position) == 1);
-        messages.push_back(acquired);
-        messages.back().status = QueuedMessage::ACQUIRED;
-    }
+size_t MessageDeque::size()
+{
+    return messages.size();
 }
 
-namespace {
-bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
-} // namespace
+Message* MessageDeque::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+    return messages.find(position, cursor);
+}
 
-void MessageDeque::setPosition(const framing::SequenceNumber& n) {
-    size_t i = index(n+1);
-    if (i >= messages.size()) return; // Nothing to do.
-
-    // Assertion to verify the precondition: no messaages after n.
-    assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
-           messages.end());
-    messages.erase(messages.begin()+i, messages.end());
-    if (head >= messages.size()) head = messages.size() - 1;
-    // Re-count the available messages
-    available = 0;
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE) ++available;
-    }
-}
-
-void MessageDeque::clean()
-{
-    // QPID-4046: If a queue has multiple consumers, then it is possible for a large
-    // collection of deleted messages to build up.  Limit the number of messages cleaned
-    // up on each call to clean().
-    size_t count = 0;
-    while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
-        messages.pop_front();
-        count += 1;
-    }
-    head = (head > count) ? head - count : 0;
+Message* MessageDeque::find(const QueueCursor& cursor)
+{
+    return messages.find(cursor);
 }
 
 void MessageDeque::foreach(Functor f)
 {
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE) {
-            f(*i);
-        }
-    }
-}
-
-void MessageDeque::removeIf(Predicate p)
-{
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
-            //Use special status for this as messages are not yet
-            //dequeued, but should not be considered on the queue
-            //either (used for purging and moving)
-            i->status = QueuedMessage::REMOVED;
-            --available;
-        }
-    }
-    clean();
+    messages.foreach(f);
+}
+
+void MessageDeque::resetCursors()
+{
+    messages.resetCursors();
 }
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,7 @@
  *
  */
 #include "qpid/broker/Messages.h"
-#include "qpid/broker/QueuedMessage.h"
-#include <deque>
+#include "qpid/broker/IndexedDeque.h"
 
 namespace qpid {
 namespace broker {
@@ -36,31 +35,20 @@ class MessageDeque : public Messages
   public:
     MessageDeque();
     size_t size();
-    bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void updateAcquired(const QueuedMessage& acquired);
-    void setPosition(const framing::SequenceNumber&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
+
     void foreach(Functor);
-    void removeIf(Predicate);
 
-    // For use by other Messages implementations that use MessageDeque as a FIFO index
-    // and keep pointers to its elements in their own indexing strctures.
-    void clean();
-    QueuedMessage* releasePtr(const QueuedMessage&);
-    QueuedMessage* pushPtr(const QueuedMessage& added);
+    void resetCursors();
 
   private:
-    typedef std::deque<QueuedMessage> Deque;
+    typedef IndexedDeque<Message> Deque;
     Deque messages;
-    size_t available;
-    size_t head;
-
-    size_t index(const framing::SequenceNumber&);
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h Mon Aug 27 15:40:33 2012
@@ -21,51 +21,28 @@
  * under the License.
  *
  */
-
+#include "qpid/types/Variant.h"
 /** Abstraction used by Queue to determine the next "most desirable" message to provide to
  * a particular consuming client
  */
 
-
-#include "qpid/broker/Consumer.h"
-
 namespace qpid {
 namespace broker {
 
-struct QueuedMessage;
+class Message;
 
 class MessageDistributor
 {
  public:
     virtual ~MessageDistributor() {};
 
-    /** Locking Note: all methods assume the caller is holding the Queue::messageLock
-     * during the method call.
-     */
-
-    /** Determine the next message available for consumption by the consumer
-     * @param consumer the consumer that needs a message to consume
-     * @param next set to the next message that the consumer may consume.
-     * @return true if message is available and next is set
-     */
-    virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
-                                        QueuedMessage& next ) = 0;
-
-    /** Allow the comsumer to take ownership of the given message.
+    /**
+     * Determine whether the named consumer can take ownership of the specified message.
      * @param consumer the name of the consumer that is attempting to acquire the message
-     * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+     * @param target the message to be acquired
      * @return true if ownership is permitted, false if ownership cannot be assigned.
      */
-    virtual bool allocate( const std::string& consumer,
-                           const QueuedMessage& target) = 0;
-
-    /** Determine the next message available for browsing by the consumer
-     * @param consumer the consumer that is browsing the queue
-     * @param next set to the next message that the consumer may browse.
-     * @return true if a message is available and next is returned
-     */
-    virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
-                                       QueuedMessage& next ) = 0;
+    virtual bool acquire(const std::string& consumer, Message& target) = 0;
 
     /** hook to add any interesting management state to the status map */
     virtual void query(qpid::types::Variant::Map&) const = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp Mon Aug 27 15:40:33 2012
@@ -1,4 +1,4 @@
-/*
+ /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,10 +20,16 @@
  */
 
 #include "qpid/broker/MessageGroupManager.h"
-
-#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/framing/TypeCode.h"
+#include "qpid/types/Variant.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Variant.h"
 
@@ -75,24 +81,16 @@ void MessageGroupManager::disown( GroupS
     freeGroups[state.members.front().position] = &state;
 }
 
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m )
 {
-    uint32_t thisMsg = qm.position.getValue();
+    uint32_t thisMsg = m.getSequence().getValue();
     if (cachedGroup && lastMsg == thisMsg) {
         hits++;
         return *cachedGroup;
     }
 
-    std::string group = defaultGroupId;
-    const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
-    if (headers) {
-        qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
-        if (id && id->convertsTo<std::string>()) {
-            std::string tmp = id->get<std::string>();
-            if (!tmp.empty())   // empty group is reserved
-                group = tmp;
-        }
-    }
+    std::string group = m.getPropertyAsString(groupIdHeader);
+    if (group.empty()) group = defaultGroupId; //empty group is reserved
 
     if (cachedGroup && group == lastGroup) {
         hits++;
@@ -112,48 +110,48 @@ MessageGroupManager::GroupState& Message
 }
 
 
-void MessageGroupManager::enqueued( const QueuedMessage& qm )
+void MessageGroupManager::enqueued( const Message& m )
 {
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageState mState(qm.position);
+    GroupState& state = findGroup(m);
+    GroupState::MessageState mState(m.getSequence());
     state.members.push_back(mState);
     uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
               ": added message to group id=" << state.group << " total=" << total );
     if (total == 1) {
         // newly created group, no owner
-        assert(freeGroups.find(qm.position) == freeGroups.end());
-        freeGroups[qm.position] = &state;
+        assert(freeGroups.find(m.getSequence()) == freeGroups.end());
+        freeGroups[m.getSequence()] = &state;
     }
 }
 
 
-void MessageGroupManager::acquired( const QueuedMessage& qm )
+void MessageGroupManager::acquired( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    m->acquired = true;
+    GroupState& state = findGroup(m);
+    GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence());
+    assert(gm != state.members.end());
+    gm->acquired = true;
     state.acquired += 1;
     QPID_LOG( trace, "group queue " << qName <<
               ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
 }
 
 
-void MessageGroupManager::requeued( const QueuedMessage& qm )
+void MessageGroupManager::requeued( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
+    GroupState& state = findGroup(m);
     assert( state.acquired != 0 );
     state.acquired -= 1;
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    m->acquired = false;
+    GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+    assert(i != state.members.end());
+    i->acquired = false;
     if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
@@ -164,14 +162,14 @@ void MessageGroupManager::requeued( cons
 }
 
 
-void MessageGroupManager::dequeued( const QueuedMessage& qm )
+void MessageGroupManager::dequeued( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    if (m->acquired) {
+    GroupState& state = findGroup(m);
+    GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+    assert(i != state.members.end());
+    if (i->acquired) {
         assert( state.acquired != 0 );
         state.acquired -= 1;
     }
@@ -179,7 +177,7 @@ void MessageGroupManager::dequeued( cons
     // special case if qm is first (oldest) message in the group:
     // may need to re-insert it back on the freeGroups list, as the index will change
     bool reFreeNeeded = false;
-    if (m == state.members.begin()) {
+    if (i == state.members.begin()) {
         if (!state.owned()) {
             // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
             // if on freelist, it is indexed by first member, which is about to be removed!
@@ -188,7 +186,7 @@ void MessageGroupManager::dequeued( cons
         }
         state.members.pop_front();
     } else {
-        state.members.erase(m);
+        state.members.erase(i);
     }
 
     uint32_t total = state.members.size();
@@ -206,6 +204,12 @@ void MessageGroupManager::dequeued( cons
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
         disown(state);
+        MessageDeque* md = dynamic_cast<MessageDeque*>(&messages);
+        if (md) {
+            md->resetCursors();
+        } else {
+            QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type");
+        }
     } else if (reFreeNeeded) {
         disown(state);
     }
@@ -215,55 +219,27 @@ MessageGroupManager::~MessageGroupManage
 {
     QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
 }
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+
+bool MessageGroupManager::acquire(const std::string& consumer, Message& m)
 {
-    if (!messages.size())
-        return false;
+    if (m.getState() == AVAILABLE) {
+        // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+        GroupState& state = findGroup(m);
 
-    next.position = c->getPosition();
-    if (!freeGroups.empty()) {
-        const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
-        if (nextFree <= next.position) {  // take oldest free
-            next.position = nextFree;
-            --next.position;
+        if (!state.owned()) {
+            own( state, consumer );
+            QPID_LOG( trace, "group queue " << qName <<
+                      ": consumer name=" << consumer << " has acquired group id=" << state.group);
         }
-    }
-
-    while (messages.browse( next.position, next, true )) {
-        GroupState& group = findGroup(next);
-        if (!group.owned()) {
-            //TODO: make acquire more efficient when we already have the message in question
-            if (group.members.front().position == next.position && messages.acquire(next.position, next)) {    // only take from head!
-                return true;
-            }
-            QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
-                     << "'s head message still pending. pos=" << group.members.front().position);
-        } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
+        if (state.owner == consumer) {
+            m.setState(ACQUIRED);
             return true;
+        } else {
+            return false;
         }
+    } else {
+        return false;
     }
-    return false;
-}
-
-
-bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
-{
-    // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
-    GroupState& state = findGroup(qm);
-
-    if (!state.owned()) {
-        own( state, consumer );
-        QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << consumer << " has acquired group id=" << state.group);
-        return true;
-    }
-    return state.owner == consumer;
-}
-
-bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
-    // browse: allow access to any available msg, regardless of group ownership (?ok?)
-    return messages.browse(c->getPosition(), next, false);
 }
 
 void MessageGroupManager::query(qpid::types::Variant::Map& status) const
@@ -296,11 +272,9 @@ void MessageGroupManager::query(qpid::ty
         // set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present
         info[GROUP_TIMESTAMP] = 0;
         if (g->second.members.size() != 0) {
-            QueuedMessage qm;
-            if (messages.find(g->second.members.front().position, qm) &&
-                qm.payload &&
-                qm.payload->hasProperties<framing::DeliveryProperties>()) {
-                info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
+            Message* m = messages.find(g->second.members.front().position, 0);
+            if (m && m->getTimestamp()) {
+                info[GROUP_TIMESTAMP] = m->getTimestamp();
             }
         }
         info[GROUP_CONSUMER] = g->second.owner;
@@ -313,33 +287,13 @@ void MessageGroupManager::query(qpid::ty
 
 boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
                                                                     Messages& messages,
-                                                                    const qpid::framing::FieldTable& settings )
+                                                                    const QueueSettings& settings )
 {
-    boost::shared_ptr<MessageGroupManager> empty;
-
-    if (settings.isSet(qpidMessageGroupKey)) {
-
-        // @todo: remove once "sticky" consumers are supported - see QPID-3347
-        if (!settings.isSet(qpidSharedGroup)) {
-            QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
-            return empty;
-        }
-
-        std::string headerKey = settings.getAsString(qpidMessageGroupKey);
-        if (headerKey.empty()) {
-            QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
-            return empty;
-        }
-        unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
-
-        boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
-
-        QPID_LOG( debug, "Configured Queue '" << qName <<
-                  "' for message grouping using header key '" << headerKey << "'" <<
-                  " (timestamp=" << timestamp << ")");
-        return manager;
-    }
-    return empty;
+    boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) );
+    QPID_LOG( debug, "Configured Queue '" << qName <<
+              "' for message grouping using header key '" << settings.groupKey << "'" <<
+              " (timestamp=" << settings.addTimestamp << ")");
+    return manager;
 }
 
 std::string MessageGroupManager::defaultGroupId;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h Mon Aug 27 15:40:33 2012
@@ -24,8 +24,10 @@
 
 /* for managing message grouping on Queues */
 
+#include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/MessageDistributor.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/unordered_map.h"
 
 #include <deque>
@@ -34,6 +36,7 @@ namespace qpid {
 namespace broker {
 
 class QueueObserver;
+struct QueueSettings;
 class MessageDistributor;
 class Messages;
 
@@ -76,11 +79,7 @@ class MessageGroupManager : public State
     GroupFifo freeGroups;   // ordered by oldest free msg
     //Consumers consumers;    // index: consumer name
 
-    static const std::string qpidMessageGroupKey;
-    static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
-    static const std::string qpidMessageGroupTimestamp;
-
-    GroupState& findGroup( const QueuedMessage& qm );
+    GroupState& findGroup( const Message& m );
     unsigned long hits, misses; // for debug
     uint32_t lastMsg;
     std::string lastGroup;
@@ -91,11 +90,14 @@ class MessageGroupManager : public State
     void disown( GroupState& state );
 
  public:
+    static const std::string qpidMessageGroupKey;
+    static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
+    static const std::string qpidMessageGroupTimestamp;
 
     static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
     static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
                                                           Messages& messages,
-                                                          const qpid::framing::FieldTable& settings );
+                                                          const QueueSettings& settings );
 
     MessageGroupManager(const std::string& header, const std::string& _qName,
                         Messages& container, unsigned int _timestamp=0 )
@@ -106,22 +108,20 @@ class MessageGroupManager : public State
     virtual ~MessageGroupManager();
 
     // QueueObserver iface
-    void enqueued( const QueuedMessage& qm );
-    void acquired( const QueuedMessage& qm );
-    void requeued( const QueuedMessage& qm );
-    void dequeued( const QueuedMessage& qm );
+    void enqueued( const Message& qm );
+    void acquired( const Message& qm );
+    void requeued( const Message& qm );
+    void dequeued( const Message& qm );
     void consumerAdded( const Consumer& ) {};
     void consumerRemoved( const Consumer& ) {};
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 
     // MessageDistributor iface
-    bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
-    bool allocate(const std::string& c, const QueuedMessage& qm);
-    bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+    bool acquire(const std::string& c, Message& );
     void query(qpid::types::Variant::Map&) const;
 
-    bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+    bool match(const qpid::types::Variant::Map*, const Message&) const;
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp Mon Aug 27 15:40:33 2012
@@ -19,7 +19,8 @@
  *
  */
 #include "qpid/broker/MessageMap.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/log/Statement.h"
 #include <algorithm>
 
@@ -29,29 +30,17 @@ namespace {
 const std::string EMPTY;
 }
 
-bool MessageMap::deleted(const QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(message.position);
-    if (i != messages.end()) {
-        erase(i);
-        return true;
-    } else {
-        return false;
-    }
-}
 
-std::string MessageMap::getKey(const QueuedMessage& message)
+std::string MessageMap::getKey(const Message& message)
 {
-    const framing::FieldTable* ft = message.payload->getApplicationHeaders();
-    if (ft) return ft->getAsString(key);
-    else return EMPTY;
+    return message.getPropertyAsString(key);
 }
 
 size_t MessageMap::size()
 {
     size_t count(0);
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+        if (i->second.getState() == AVAILABLE) ++count;
     }
     return count;
 }
@@ -61,116 +50,103 @@ bool MessageMap::empty()
     return size() == 0;//TODO: more efficient implementation
 }
 
-void MessageMap::release(const QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(message.position);
-    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
-        i->second.status = QueuedMessage::AVAILABLE;
-    }
-}
-
-bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::deleted(const QueueCursor& cursor)
 {
-    Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        i->second.status = QueuedMessage::ACQUIRED;
-        message = i->second;
+    Ordering::iterator i = messages.find(cursor.position);
+    if (i != messages.end()) {
+        erase(i);
         return true;
     } else {
         return false;
     }
 }
 
-bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* MessageMap::find(const QueueCursor& cursor)
 {
-    Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        message = i->second;
-        return true;
-    } else {
-        return false;
-    }
+    if (cursor.valid) return find(cursor.position, 0);
+    else return 0;
 }
 
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* cursor)
 {
-    Ordering::iterator i = messages.lower_bound(position+1);
-    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
-        message = i->second;
-        return true;
+    Ordering::iterator i = messages.lower_bound(position);
+    if (i != messages.end()) {
+        if (cursor) cursor->setPosition(i->first, version);
+        if (i->first == position) return &(i->second);
+        else return 0;
     } else {
-        return false;
+        //there is no message whose sequence is greater than position,
+        //i.e. haven't got there yet
+        if (cursor) cursor->setPosition(position, version);
+        return 0;
     }
 }
 
-bool MessageMap::consume(QueuedMessage& message)
+Message* MessageMap::next(QueueCursor& cursor)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) {
-            i->second.status = QueuedMessage::ACQUIRED;
-            message = i->second;
-            return true;
+    Ordering::iterator i;
+    if (!cursor.valid) i = messages.begin(); //start with oldest message
+    else i = messages.upper_bound(cursor.position); //get first message that is greater than position
+
+    while (i != messages.end()) {
+        Message& m = i->second;
+        cursor.setPosition(m.getSequence(), version);
+        if (cursor.check(m)) {
+            return &m;
+        } else {
+            ++i;
         }
     }
-    return false;
+    return 0;
 }
 
-const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+const Message& MessageMap::replace(const Message& original, const Message& update)
 {
-    messages.erase(original.position);
-    messages[update.position] = update;
-    return update;
+    messages.erase(original.getSequence());
+    std::pair<Ordering::iterator, bool> i = messages.insert(Ordering::value_type(update.getSequence(), update));
+    i.first->second.setState(AVAILABLE);
+    return i.first->second;
 }
 
-bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+void MessageMap::publish(const Message& added)
+{
+    Message dummy;
+    update(added, dummy);
+}
+
+bool MessageMap::update(const Message& added, Message& removed)
 {
     std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
     if (result.second) {
         //there was no previous message for this key; nothing needs to
         //be removed, just add the message into its correct position
-        QueuedMessage& a = messages[added.position];
-        a = added;
-        a.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Added message " << a);
+        messages.insert(Ordering::value_type(added.getSequence(), added)).first->second.setState(AVAILABLE);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
-        result.first->second.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
+        result.first->second.setState(AVAILABLE);
+        QPID_LOG(debug, "Displaced message at " << removed.getSequence() << " with " << result.first->second.getSequence() << ": " << result.first->first);
         return true;
     }
 }
 
-void MessageMap::setPosition(const framing::SequenceNumber& seq) {
-    // Nothing to do, just assert that the precondition is respected and there
-    // are no undeleted messages after seq.
-    (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
-}
-
-void MessageMap::foreach(Functor f)
+Message* MessageMap::release(const QueueCursor& cursor)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
+    Ordering::iterator i = messages.find(cursor.position);
+    if (i != messages.end()) {
+        i->second.setState(AVAILABLE);
+        return &i->second;
+    } else {
+        return 0;
     }
 }
 
-void MessageMap::removeIf(Predicate p)
+void MessageMap::foreach(Functor f)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end();) {
-        if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
-            index.erase(getKey(i->second));
-            //Note: Removing from messages means that the subsequent
-            //call to deleted() for the same message will return
-            //false. At present that is not a problem. If this were
-            //changed to hold onto the message until dequeued
-            //(e.g. with REMOVED state), then the erase() below would
-            //need to take that into account.
-            messages.erase(i++);
-        } else {
-            ++i;
-        }
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.getState() == AVAILABLE) f(i->second);
     }
 }
 
@@ -180,6 +156,6 @@ void MessageMap::erase(Ordering::iterato
     messages.erase(i);
 }
 
-MessageMap::MessageMap(const std::string& k) : key(k) {}
+MessageMap::MessageMap(const std::string& k) : key(k), version(0) {}
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h Mon Aug 27 15:40:33 2012
@@ -6,7 +6,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
-o * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -22,6 +22,7 @@ o * regarding copyright ownership.  The 
  *
  */
 #include "qpid/broker/Messages.h"
+#include "qpid/broker/Message.h"
 #include "qpid/framing/SequenceNumber.h"
 #include <map>
 #include <string>
@@ -38,32 +39,31 @@ class MessageMap : public Messages
 {
   public:
     MessageMap(const std::string& key);
-    virtual ~MessageMap() {}
 
     size_t size();
     bool empty();
 
-    virtual bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void setPosition(const framing::SequenceNumber&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);//use update instead to get replaced message
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
 
     void foreach(Functor);
-    virtual void removeIf(Predicate);
+
+    bool update(const Message& added, Message& removed);
 
   protected:
-    typedef std::map<std::string, QueuedMessage> Index;
-    typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+    typedef std::map<std::string, Message> Index;
+    typedef std::map<framing::SequenceNumber, Message> Ordering;
     const std::string key;
     Index index;
     Ordering messages;
+    int32_t version;
 
-    std::string getKey(const QueuedMessage&);
-    virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+    std::string getKey(const Message&);
+    virtual const Message& replace(const Message&, const Message&);
     void erase(Ordering::iterator);
 };
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h Mon Aug 27 15:40:33 2012
@@ -29,7 +29,8 @@ namespace framing {
 class SequenceNumber;
 }
 namespace broker {
-struct QueuedMessage;
+class Message;
+class QueueCursor;
 
 /**
  * This interface abstracts out the access to the messages held for
@@ -39,8 +40,7 @@ struct QueuedMessage;
 class Messages
 {
   public:
-    typedef boost::function1<void, QueuedMessage&> Functor;
-    typedef boost::function1<bool, QueuedMessage&> Predicate;
+    typedef boost::function1<void, Message&> Functor;
 
     virtual ~Messages() {}
     /**
@@ -51,47 +51,44 @@ class Messages
     /**
      * Called when a message is deleted from the queue.
      */
-    virtual bool deleted(const QueuedMessage&) = 0;
+    virtual bool deleted(const QueueCursor&) = 0;
     /**
-     * Releases an acquired message, making it available again.
+     * Makes a message available.
      */
-    virtual void release(const QueuedMessage&) = 0;
+    virtual void publish(const Message& added) = 0;
     /**
-     * Acquire the message at the specified position, returning true
-     * if found, false otherwise. The acquired message is passed back
-     * via the second parameter.
-     */
-    virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0;
-    /**
-     * Find the message at the specified position, returning true if
-     * found, false otherwise. The matched message is passed back via
-     * the second parameter.
+     * Retrieve the next message for the given cursor. A reference to
+     * the message is passed back via the second parameter.
+     *
+     * @return a pointer to the message if there is one, in which case
+     * the cursor that points to it is assigned to cursor; null
+     * otherwise.
      */
-    virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+    virtual Message* next(QueueCursor& cursor) = 0;
+
     /**
-     * Retrieve the next message to be given to a browsing
-     * subscription that has reached the specified position. The next
-     * message is passed back via the second parameter.
+     * Release the message i.e. return it to the available state
+     * unless it has already been deleted.
      *
-     * @param unacquired, if true, will only browse unacquired messages
-     *
-     * @return true if there is another message, false otherwise.
+     * @return a pointer to the Message if it is still in acquired state and
+     * hence can be released; null if it has already been deleted
      */
-    virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0;
+    virtual Message* release(const QueueCursor& cursor) = 0;
     /**
-     * Retrieve the next message available for a consuming
-     * subscription.
-     *
-     * @return true if there is such a message, false otherwise.
+     * Find the message with the specified sequence number, returning
+     * a pointer if found, null otherwise. A cursor to the matched
+     * message can be passed back via the second parameter, regardless
+     * of whether the message is found, using this cursor to call
+     * next() will give the next message greater than position if one
+     * exists.
      */
-    virtual bool consume(QueuedMessage&) = 0;
+    virtual Message* find(const framing::SequenceNumber&, QueueCursor*) = 0;
+
     /**
-     * Pushes a message to the back of the 'queue'. For some types of
-     * queue this may cause another message to be removed; if that is
-     * the case the method will return true and the removed message
-     * will be passed out via the second parameter.
+     * Find the message at the specified position, returning a pointer if
+     * found, null otherwise.
      */
-    virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+    virtual Message* find(const QueueCursor&) = 0;
 
     /**
      * Add an already acquired message to the queue.
@@ -99,25 +96,11 @@ class Messages
      * Only need be implemented by subclasses that keep track of
      * acquired messages.
      */
-    virtual void updateAcquired(const QueuedMessage&) { }
-
-    /**
-     * Set the position of the back of the queue. Next message enqueued will be n+1.
-     *@pre Any messages with seq > n must already be dequeued.
-     */
-    virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
-
+    //virtual void updateAcquired(const QueuedMessage&) { }
     /**
      * Apply, the functor to each message held
      */
-
     virtual void foreach(Functor) = 0;
-    /**
-     * Remove every message held that for which the specified
-     * predicate returns true
-     */
-    virtual void removeIf(Predicate) = 0;
-
   private:
 };
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ namespace broker {
 /**
  * Base class for all persistable objects
  */
-class Persistable : public RefCounted
+class Persistable : public virtual RefCounted
 {
 public:
     /**

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp Mon Aug 27 15:40:33 2012
@@ -30,163 +30,58 @@ using namespace qpid::broker;
 namespace qpid {
 namespace broker {
 
+PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {}
 PersistableMessage::~PersistableMessage() {}
 
-PersistableMessage::PersistableMessage() :
-    asyncDequeueCounter(0),
-    store(0),
-    asyncStore(0)
-{}
-
-void PersistableMessage::flush()
+void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i)
 {
-    syncList copy;
-    {
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-	if (store) {
-	    copy = synclist;
-	} else {
-            return;//early exit as nothing to do
-	}
+    ingressCompletion = i.get();
+    /**
+     * What follows is a hack to account for the fact that the
+     * AsyncCompletion to use may be, but is not always, this same
+     * object.
+     *
+     * This is hopefully temporary, and allows the store interface to
+     * remain unchanged without requiring another object to be allocated
+     * for every message.
+     *
+     * The case in question is where a message previously passed to
+     * the store is modified by some other queue onto which it is
+     * pushed, and then again persisted to the store. These will be
+     * two separate PersistableMessage instances (since the latter now
+     * has different content), but need to share the same
+     * AsyncCompletion (since they refer to the same incoming transfer
+     * command).
+     */
+    if (static_cast<RefCounted*>(ingressCompletion) != static_cast<RefCounted*>(this)) {
+        holder = i;
     }
-    for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
-        PersistableQueue::shared_ptr q(i->lock());
-        if (q) {
-            q->flush();
-        }
-    } 
-}
-
-void PersistableMessage::setContentReleased()
-{
-    contentReleaseState.released = true;
-}
-
-bool PersistableMessage::isContentReleased() const
-{ 
-    return contentReleaseState.released;
-}
-       
-
-bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
-    if (store && (queue->getPersistenceId()!=0)) {
-        for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-            PersistableQueue::shared_ptr q(i->lock());
-            if (q && q->getPersistenceId() == queue->getPersistenceId())  return true;
-        } 
-    }            
-    return false;
 }
 
-// deprecated
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        store = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
-}
 
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        asyncStore = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
+void PersistableMessage::flush()
+{
+    //TODO: is this really the right place for this?
 }
 
 // deprecated
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
-    addToSyncList(queue, _store);
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+{
     enqueueStart();
 }
 
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
-    addToSyncList(queue, _store);
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*)
+{
     enqueueStart();
 }
 
-bool PersistableMessage::isDequeueComplete() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    return asyncDequeueCounter == 0;
-}
-    
-void PersistableMessage::dequeueComplete() { 
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-        if (asyncDequeueCounter > 0) {
-            if (--asyncDequeueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) allDequeuesComplete();
-}
-
 // deprecated
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        store = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
-    dequeueAsync();
-}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
 
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
-    if (_store){
-        sys::ScopedLock<sys::Mutex> l(storeLock);
-        asyncStore = _store;
-        boost::weak_ptr<PersistableQueue> q(queue);
-        synclist.push_back(q);
-    }
-    dequeueAsync();
-}
-
-void PersistableMessage::dequeueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
-    asyncDequeueCounter++; 
-}
-
-PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
-
-// deprecated
-void PersistableMessage::setStore(MessageStore* s)
-{
-    store = s;
-}
-
-void PersistableMessage::setStore(AsyncStore* s)
-{
-    asyncStore = s;
-}
-
-void PersistableMessage::requestContentRelease()
-{
-    contentReleaseState.requested = true;
-}
-void PersistableMessage::blockContentRelease()
-{ 
-    contentReleaseState.blocked = true;
-}
-bool PersistableMessage::checkContentReleasable()
-{ 
-    return contentReleaseState.requested && !contentReleaseState.blocked;
-}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {}
 
-bool PersistableMessage::isContentReleaseBlocked()
-{
-    return contentReleaseState.blocked;
-}
-
-bool PersistableMessage::isContentReleaseRequested()
-{
-    return contentReleaseState.requested;
-}
+bool PersistableMessage::isDequeueComplete() { return false; }
+void PersistableMessage::dequeueComplete() {}
 
 }}
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h Mon Aug 27 15:40:33 2012
@@ -24,30 +24,32 @@
 
 #include <string>
 #include <list>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <map>
+#include <boost/intrusive_ptr.hpp>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Persistable.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/amqp_framing.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
 #include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/MessageHandle.h"
 
 namespace qpid {
+namespace types {
+class Variant;
+}
 namespace broker {
 
 class MessageStore;
 class AsyncStore;
+class Queue;
 
 /**
  * Base class for persistable messages.
  */
 class PersistableMessage : public Persistable
 {
-    typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncDequeueLock;
-    sys::Mutex storeLock;
-
     /**
      * "Ingress" messages == messages sent _to_ the broker.
      * Tracks the number of outstanding asynchronous operations that must
@@ -57,72 +59,26 @@ class PersistableMessage : public Persis
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    AsyncCompletion ingressCompletion;
-
-    /**
-     * Tracks the number of outstanding asynchronous dequeue
-     * operations. When the message is dequeued asynchronously the
-     * count is incremented; when that dequeue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * dequeues.
-     */
-    int asyncDequeueCounter;
-
-    void dequeueAsync();
-
-    syncList synclist;
-    struct ContentReleaseState
-    {
-        bool blocked;
-        bool requested;
-        bool released;
-        
-        ContentReleaseState();
-    };
-    ContentReleaseState contentReleaseState;
-
-  protected:
-    /** Called when all dequeues are complete for this message. */
-    virtual void allDequeuesComplete() = 0;
-
-    void setContentReleased();
-
-    MessageStore* store;    // deprecated, use AsyncStore
-    AsyncStore* asyncStore; // new AsyncStore interface
-
+    AsyncCompletion* ingressCompletion;
+    boost::intrusive_ptr<AsyncCompletion> holder;
+    mutable uint64_t persistenceId;
+    MessageHandle msgHandle;
 
   public:
-    typedef boost::shared_ptr<PersistableMessage> shared_ptr;
-
-    /**
-     * @returns the size of the headers when encoded
-     */
-    virtual uint32_t encodedHeaderSize() const = 0;
-
-    virtual ~PersistableMessage();
-
     PersistableMessage();
+    virtual ~PersistableMessage();
 
     void flush();
-    
-    QPID_BROKER_EXTERN bool isContentReleased() const;
-
-    QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated
-    QPID_BROKER_EXTERN void setStore(AsyncStore*);
-    void requestContentRelease();
-    void blockContentRelease();
-    bool checkContentReleasable();
-    bool isContentReleaseBlocked();
-    bool isContentReleaseRequested();
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
     /** track the progress of a message received by the broker - see ingressCompletion above */
-    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
-    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
+    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
+    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
+    QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
 
-    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
-    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
+    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
+    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
                                          MessageStore* _store);
@@ -131,18 +87,23 @@ class PersistableMessage : public Persis
 
 
     QPID_BROKER_EXTERN bool isDequeueComplete();
-    
     QPID_BROKER_EXTERN void dequeueComplete();
-
     QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
                                          MessageStore* _store);
     QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
                                          AsyncStore* _store);
 
-    bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
-    
-    void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated
-    void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store);
+    uint64_t getPersistenceId() const { return persistenceId; }
+    void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+    MessageHandle& getMessageHandle() { return msgHandle; }
+    const MessageHandle& getMessagehandle() const { return msgHandle; }
+
+
+    virtual void decodeHeader(framing::Buffer& buffer) = 0;
+    virtual void decodeContent(framing::Buffer& buffer) = 0;
+    virtual uint32_t encodedHeaderSize() const = 0;
+    virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0;
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp Mon Aug 27 15:40:33 2012
@@ -19,24 +19,53 @@
  *
  */
 #include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
+#include <algorithm>
 #include <cmath>
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace broker {
+namespace {
+class PriorityContext : public CursorContext {
+  public:
+    std::vector<QueueCursor> position;
+    PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
+};
+}
+
 
 PriorityQueue::PriorityQueue(int l) :
     levels(l),
-    messages(levels, Deque()),
-    frontLevel(0), haveFront(false), cached(false) {}
+    messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
+    counters(levels, framing::SequenceNumber()),
+    fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
+    frontLevel(0), haveFront(false), cached(false)
+{
+}
 
-bool PriorityQueue::deleted(const QueuedMessage& qm) {
-    bool deleted = fifo.deleted(qm);
-    if (deleted) erase(qm);
-    return deleted;
+bool PriorityQueue::deleted(const QueueCursor& c)
+{
+    MessagePointer* ptr = fifo.find(c);
+    if (ptr && ptr->holder) {
+        //mark the message as deleted
+        ptr->holder->message.setState(DELETED);
+        //clean the deque for the relevant priority level
+        boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
+        messages[ptr->holder->priority].clean();
+        //stop referencing that message holder (it may now have been
+        //deleted)
+        ptr->holder = 0;
+        //clean fifo index
+        fifo.clean();
+        return true;
+    } else {
+        return false;
+    }
 }
 
 size_t PriorityQueue::size()
@@ -44,85 +73,69 @@ size_t PriorityQueue::size()
     return fifo.size();
 }
 
-namespace {
-bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
-}
-
-void PriorityQueue::release(const QueuedMessage& message)
+Message* PriorityQueue::next(QueueCursor& cursor)
 {
-    QueuedMessage* qm = fifo.releasePtr(message);
-    if (qm) {
-        uint p = getPriorityLevel(message);
-        messages[p].insert(
-            lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
-        clearCache();
-    }
-}
-
-
-void PriorityQueue::erase(const QueuedMessage& qm) {
-    size_t i = getPriorityLevel(qm);
-    if (!messages[i].empty()) {
-        long diff = qm.position.getValue() - messages[i].front()->position.getValue();
-        if (diff < 0) return;
-        long maxEnd = std::min(size_t(diff), messages[i].size());
-        QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
-        Deque::iterator l =
-            lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
-        if (l != messages[i].end() && (*l)->position == qm.position) {
-            messages[i].erase(l);
-            clearCache();
-            return;
+    boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
+    if (!ctxt) {
+        ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+        cursor.context = ctxt;
+    }
+    if (cursor.type == REPLICATOR) {
+        //browse in fifo order
+        MessagePointer* ptr = fifo.next(cursor);
+        return ptr ? &(ptr->holder->message) : 0;
+    } else if (cursor.type == PURGE) {
+        //iterate over message in reverse priority order (i.e. purge lowest priority message first)
+        //ignore any fairshare configuration here as well
+        for (int p = 0; p < levels; ++p) {
+            MessageHolder* holder = messages[p].next(ctxt->position[p]);
+            if (holder) {
+                cursor.setPosition(holder->message.getSequence(), 0);
+                return &(holder->message);
+            }
         }
+        return 0;
+    } else {
+        //check each level in turn, in priority order, for any more messages
+        Priority p = firstLevel();
+        do {
+            MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
+            if (holder) {
+                cursor.setPosition(holder->message.getSequence(), 0);
+                return &(holder->message);
+            }
+        } while (nextLevel(p));
+        return 0;
     }
 }
 
-bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const QueueCursor& cursor)
 {
-    bool acquired = fifo.acquire(position, message);
-    if (acquired) erase(message); // No longer available
-    return acquired;
+    return find(cursor.position, 0);
 }
 
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
 {
-    return fifo.find(position, message);
+    MessagePointer* ptr = fifo.find(position, cursor);
+    return ptr ? &(ptr->holder->message) : 0;
 }
 
-bool PriorityQueue::browse(
-    const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+void PriorityQueue::publish(const Message& published)
 {
-    return fifo.browse(position, message, unacquired);
+    MessageHolder holder;
+    holder.message = published;
+    holder.priority = getPriorityLevel(published);
+    holder.id = ++(counters[holder.priority]);
+    MessagePointer pointer;
+    pointer.holder = &(messages[holder.priority].publish(holder));
+    pointer.id = published.getSequence();
+    fifo.publish(pointer);
 }
 
-bool PriorityQueue::consume(QueuedMessage& message)
+Message* PriorityQueue::release(const QueueCursor& cursor)
 {
-    if (checkFront()) {
-        QueuedMessage* pm = messages[frontLevel].front();
-        messages[frontLevel].pop_front();
-        clearCache();
-        pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
-        message = *pm;
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
-    QueuedMessage* qmp = fifo.pushPtr(added);
-    messages[getPriorityLevel(added)].push_back(qmp);
-    clearCache();
-    return false; // Adding a message never causes one to be removed for deque
-}
-
-void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
-    fifo.updateAcquired(acquired);
-}
-
-void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
-    fifo.setPosition(n);
+    MessagePointer* ptr = fifo.release(cursor);
+    return ptr ? &(ptr->holder->message) : 0;
 }
 
 void PriorityQueue::foreach(Functor f)
@@ -130,62 +143,87 @@ void PriorityQueue::foreach(Functor f)
     fifo.foreach(f);
 }
 
-void PriorityQueue::removeIf(Predicate p)
-{
-    for (int priority = 0; priority < levels; ++priority) {
-        for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
-            if (p(**i)) {
-                (*i)->status = QueuedMessage::DELETED; // Updates fifo index
-                i = messages[priority].erase(i);
-                clearCache();
-            } else {
-                ++i;
-            }
-        }
-    }
-    fifo.clean();
-}
-
-uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+uint PriorityQueue::getPriorityLevel(const Message& m) const
 {
-    uint priority = m.payload->getPriority();
+    uint priority = m.getPriority();
     //Use AMQP 0-10 approach to mapping priorities to a fixed level
     //(see rule priority-level-implementation)
     const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
     if (priority <= firstLevel) return 0;
     return std::min(priority - firstLevel, (uint)levels-1);
 }
+PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
+{
+    PriorityQueue::MessagePointer pointer;
+    pointer.holder = 0;
+    pointer.id = id;
+    return pointer;
+}
 
-void PriorityQueue::clearCache()
+PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
 {
-    cached = false;
+    PriorityQueue::MessageHolder holder;
+    holder.id = id;
+    holder.message.setState(DELETED);
+    return holder;
 }
 
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+PriorityQueue::Priority PriorityQueue::firstLevel()
 {
-    for (int p = levels-1; p >= 0; --p) {
-        if (!m[p].empty()) {
-            l = p;
-            return true;
-        }
+    return Priority(levels - 1);
+}
+bool PriorityQueue::nextLevel(Priority& p)
+{
+    if (p.current > 0) {
+        --(p.current);
+        return true;
+    } else {
+        return false;
     }
-    return false;
 }
 
-bool PriorityQueue::checkFront()
+framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
+{
+    return id;
+}
+void PriorityQueue::MessageHolder::setState(MessageState s)
 {
-    if (!cached) {
-        haveFront = findFrontLevel(frontLevel, messages);
-        cached = true;
+    message.setState(s);
+}
+MessageState PriorityQueue::MessageHolder::getState() const
+{
+    return message.getState();
+}
+PriorityQueue::MessageHolder::operator Message&()
+{
+    return message;
+}
+framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
+{
+    if (holder) {
+        return holder->message.getSequence();
+    } else {
+        //this is used when the instance is merely acting as padding
+        return id;
     }
-    return haveFront;
 }
-
-uint PriorityQueue::getPriority(const QueuedMessage& message)
+void PriorityQueue::MessagePointer::setState(MessageState s)
 {
-    const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
-    if (queue) return queue->getPriorityLevel(message);
-    else return 0;
+    if (holder) {
+        holder->message.setState(s);
+    }
+}
+MessageState PriorityQueue::MessagePointer::getState() const
+{
+    if (holder) {
+        return holder->message.getState();
+    } else {
+        return DELETED;
+    }
+}
+PriorityQueue::MessagePointer::operator Message&()
+{
+    assert(holder);
+    return holder->message;
 }
-
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h Mon Aug 27 15:40:33 2012
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/IndexedDeque.h"
 #include "qpid/sys/IntegerTypes.h"
 #include <deque>
 #include <vector>
@@ -44,42 +45,63 @@ class PriorityQueue : public Messages
     virtual ~PriorityQueue() {}
     size_t size();
 
-    bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void updateAcquired(const QueuedMessage& acquired);
-    void setPosition(const framing::SequenceNumber&);
-    void foreach(Functor);
-    void removeIf(Predicate);
-
-    static uint getPriority(const QueuedMessage&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
 
+    void foreach(Functor);
+    static uint getPriority(const Message&);
   protected:
-    typedef std::deque<QueuedMessage*> Deque;
-    typedef std::vector<Deque> PriorityLevels;
-    virtual bool findFrontLevel(uint& p, PriorityLevels&);
-
     const int levels;
+    struct Priority
+    {
+        const int start;
+        int current;
+        Priority(int s) : start(s), current(start) {}
+    };
+    virtual Priority firstLevel();
+    virtual bool nextLevel(Priority& );
 
   private:
-    /** Available messages separated by priority and sorted in priority order.
-     *  Holds pointers to the QueuedMessages in fifo
+    struct MessageHolder
+    {
+        Message message;
+        int priority;
+        framing::SequenceNumber id;
+        framing::SequenceNumber getSequence() const;
+        void setState(MessageState);
+        MessageState getState() const;
+        operator Message&();
+    };
+    struct MessagePointer
+    {
+        MessageHolder* holder;
+        framing::SequenceNumber id;//used only for padding
+        framing::SequenceNumber getSequence() const;
+        void setState(MessageState);
+        MessageState getState() const;
+        operator Message&();
+    };
+    typedef IndexedDeque<MessageHolder> Deque;
+    typedef std::vector<Deque> PriorityLevels;
+    typedef std::vector<framing::SequenceNumber> Counters;
+
+    /** Holds pointers to messages (stored in the fifo index) separated by priority.
      */
     PriorityLevels messages;
-    /** FIFO index of all messsagse (including acquired messages)  for fast browsing and indexing */
-    MessageDeque fifo;
+    Counters counters;
+    /** FIFO index of messages for fast browsing and indexing */
+    IndexedDeque<MessagePointer> fifo;
     uint frontLevel;
     bool haveFront;
     bool cached;
 
-    void erase(const QueuedMessage&);
-    uint getPriorityLevel(const QueuedMessage&) const;
-    void clearCache();
-    bool checkFront();
+    uint getPriorityLevel(const Message&) const;
+    MessageHolder priorityPadding(qpid::framing::SequenceNumber);
+    MessagePointer fifoPadding(qpid::framing::SequenceNumber);
 };
 
 }} // namespace qpid::broker



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org