You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC
svn commit: r1377715 [3/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/
cpp/src/qpid/asyncStore/ cpp/src/qpid...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.h Mon Aug 27 15:40:33 2012
@@ -30,21 +30,22 @@
namespace qpid {
namespace broker {
- class Message;
- class MessageStore;
+ namespace amqp_0_10 {
+ class MessageTransfer;
+ }
class QPID_BROKER_CLASS_EXTERN MessageBuilder : public framing::FrameHandler{
public:
- QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
+ QPID_BROKER_EXTERN MessageBuilder();
QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);
- boost::intrusive_ptr<Message> getMessage() { return message; }
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
void end();
private:
enum State {DORMANT, METHOD, HEADER, CONTENT};
State state;
- boost::intrusive_ptr<Message> message;
- MessageStore* const store;
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
+ std::string exchange;
void checkType(uint8_t expected, uint8_t actual);
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp Mon Aug 27 15:40:33 2012
@@ -19,218 +19,71 @@
*
*/
#include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/log/Statement.h"
#include "assert.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
+namespace {
+Message padding(qpid::framing::SequenceNumber id) {
+ Message m;
+ m.setState(DELETED);
+ m.setSequence(id);
+ return m;
+}
+}
-MessageDeque::MessageDeque() : available(0), head(0) {}
+using qpid::framing::SequenceNumber;
-size_t MessageDeque::index(const framing::SequenceNumber& position)
+MessageDeque::MessageDeque() : messages(&padding) {}
+
+
+bool MessageDeque::deleted(const QueueCursor& cursor)
{
- //assuming a monotonic sequence, with no messages removed except
- //from the ends of the deque, we can use the position to determin
- //an index into the deque
- if (messages.empty() || position < messages.front().position) return 0;
- return position - messages.front().position;
-}
-
-bool MessageDeque::deleted(const QueuedMessage& m)
-{
- size_t i = index(m.position);
- if (i < messages.size()) {
- QueuedMessage *qm = &messages[i];
- if (qm->status != QueuedMessage::DELETED) {
- qm->status = QueuedMessage::DELETED;
- qm->payload = 0; // message no longer needed
- clean();
- return true;
- }
- }
- return false;
+ return messages.deleted(cursor);
}
-size_t MessageDeque::size()
+void MessageDeque::publish(const Message& added)
{
- return available;
+ messages.publish(added);
}
-QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
+Message* MessageDeque::release(const QueueCursor& cursor)
{
- size_t i = index(message.position);
- if (i < messages.size()) {
- QueuedMessage& m = messages[i];
- if (m.status == QueuedMessage::ACQUIRED) {
- if (head > i) head = i;
- m.status = QueuedMessage::AVAILABLE;
- ++available;
- return &messages[i];
- }
- } else {
- assert(0);
- QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
- }
- return 0;
-}
-
-void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
-
-bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
- if (position < messages.front().position) return false;
- size_t i = index(position);
- if (i < messages.size()) {
- QueuedMessage& temp = messages[i];
- if (temp.status == QueuedMessage::AVAILABLE) {
- temp.status = QueuedMessage::ACQUIRED;
- --available;
- message = temp;
- return true;
- }
- }
- return false;
-}
-
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
-{
- size_t i = index(position);
- if (i < messages.size()) {
- message = messages[i];
- return true;
- } else {
- return false;
- }
-}
-
-bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
-{
- //get first message that is greater than position
- size_t i = index(position + 1);
- while (i < messages.size()) {
- QueuedMessage& m = messages[i++];
- if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
- message = m;
- return true;
- }
- }
- return false;
-}
-
-bool MessageDeque::consume(QueuedMessage& message)
-{
- while (head < messages.size()) {
- QueuedMessage& i = messages[head++];
- if (i.status == QueuedMessage::AVAILABLE) {
- i.status = QueuedMessage::ACQUIRED;
- --available;
- message = i;
- return true;
- }
- }
- return false;
+ return messages.release(cursor);
}
-namespace {
-QueuedMessage padding(uint32_t pos) {
- return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
+Message* MessageDeque::next(QueueCursor& cursor)
+{
+ return messages.next(cursor);
}
-} // namespace
-QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
- //add padding to prevent gaps in sequence, which break the index
- //calculation (needed for queue replication)
- while (messages.size() && (added.position - messages.back().position) > 1)
- messages.push_back(padding(messages.back().position + 1));
- messages.push_back(added);
- messages.back().status = QueuedMessage::AVAILABLE;
- if (head >= messages.size()) head = messages.size() - 1;
- ++available;
- clean(); // QPID-4046: let producer help clean the backlog of deleted messages
- return &messages.back();
-}
-
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
- pushPtr(added);
- return false; // adding a message never causes one to be removed for deque
-}
-
-void MessageDeque::updateAcquired(const QueuedMessage& acquired)
-{
- // Pad the front of the queue if necessary
- while (messages.size() && (acquired.position < messages.front().position))
- messages.push_front(padding(uint32_t(messages.front().position) - 1));
- size_t i = index(acquired.position);
- if (i < messages.size()) { // Replace an existing padding message
- assert(messages[i].status == QueuedMessage::DELETED);
- messages[i] = acquired;
- messages[i].status = QueuedMessage::ACQUIRED;
- }
- else { // Push to the back
- // Pad the back of the queue if necessary
- while (messages.size() && (acquired.position - messages.back().position) > 1)
- messages.push_back(padding(messages.back().position + 1));
- assert(!messages.size() || (acquired.position - messages.back().position) == 1);
- messages.push_back(acquired);
- messages.back().status = QueuedMessage::ACQUIRED;
- }
+size_t MessageDeque::size()
+{
+ return messages.size();
}
-namespace {
-bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
-} // namespace
+Message* MessageDeque::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+ return messages.find(position, cursor);
+}
-void MessageDeque::setPosition(const framing::SequenceNumber& n) {
- size_t i = index(n+1);
- if (i >= messages.size()) return; // Nothing to do.
-
- // Assertion to verify the precondition: no messaages after n.
- assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
- messages.end());
- messages.erase(messages.begin()+i, messages.end());
- if (head >= messages.size()) head = messages.size() - 1;
- // Re-count the available messages
- available = 0;
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE) ++available;
- }
-}
-
-void MessageDeque::clean()
-{
- // QPID-4046: If a queue has multiple consumers, then it is possible for a large
- // collection of deleted messages to build up. Limit the number of messages cleaned
- // up on each call to clean().
- size_t count = 0;
- while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
- messages.pop_front();
- count += 1;
- }
- head = (head > count) ? head - count : 0;
+Message* MessageDeque::find(const QueueCursor& cursor)
+{
+ return messages.find(cursor);
}
void MessageDeque::foreach(Functor f)
{
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE) {
- f(*i);
- }
- }
-}
-
-void MessageDeque::removeIf(Predicate p)
-{
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
- //Use special status for this as messages are not yet
- //dequeued, but should not be considered on the queue
- //either (used for purging and moving)
- i->status = QueuedMessage::REMOVED;
- --available;
- }
- }
- clean();
+ messages.foreach(f);
+}
+
+void MessageDeque::resetCursors()
+{
+ messages.resetCursors();
}
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,7 @@
*
*/
#include "qpid/broker/Messages.h"
-#include "qpid/broker/QueuedMessage.h"
-#include <deque>
+#include "qpid/broker/IndexedDeque.h"
namespace qpid {
namespace broker {
@@ -36,31 +35,20 @@ class MessageDeque : public Messages
public:
MessageDeque();
size_t size();
- bool deleted(const QueuedMessage&);
- void release(const QueuedMessage&);
- bool acquire(const framing::SequenceNumber&, QueuedMessage&);
- bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- void updateAcquired(const QueuedMessage& acquired);
- void setPosition(const framing::SequenceNumber&);
+ bool deleted(const QueueCursor&);
+ void publish(const Message& added);
+ Message* next(QueueCursor&);
+ Message* release(const QueueCursor& cursor);
+ Message* find(const QueueCursor&);
+ Message* find(const framing::SequenceNumber&, QueueCursor*);
+
void foreach(Functor);
- void removeIf(Predicate);
- // For use by other Messages implementations that use MessageDeque as a FIFO index
- // and keep pointers to its elements in their own indexing strctures.
- void clean();
- QueuedMessage* releasePtr(const QueuedMessage&);
- QueuedMessage* pushPtr(const QueuedMessage& added);
+ void resetCursors();
private:
- typedef std::deque<QueuedMessage> Deque;
+ typedef IndexedDeque<Message> Deque;
Deque messages;
- size_t available;
- size_t head;
-
- size_t index(const framing::SequenceNumber&);
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDistributor.h Mon Aug 27 15:40:33 2012
@@ -21,51 +21,28 @@
* under the License.
*
*/
-
+#include "qpid/types/Variant.h"
/** Abstraction used by Queue to determine the next "most desirable" message to provide to
* a particular consuming client
*/
-
-#include "qpid/broker/Consumer.h"
-
namespace qpid {
namespace broker {
-struct QueuedMessage;
+class Message;
class MessageDistributor
{
public:
virtual ~MessageDistributor() {};
- /** Locking Note: all methods assume the caller is holding the Queue::messageLock
- * during the method call.
- */
-
- /** Determine the next message available for consumption by the consumer
- * @param consumer the consumer that needs a message to consume
- * @param next set to the next message that the consumer may consume.
- * @return true if message is available and next is set
- */
- virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next ) = 0;
-
- /** Allow the comsumer to take ownership of the given message.
+ /**
+ * Determine whether the named consumer can take ownership of the specified message.
* @param consumer the name of the consumer that is attempting to acquire the message
- * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+ * @param target the message to be acquired
* @return true if ownership is permitted, false if ownership cannot be assigned.
*/
- virtual bool allocate( const std::string& consumer,
- const QueuedMessage& target) = 0;
-
- /** Determine the next message available for browsing by the consumer
- * @param consumer the consumer that is browsing the queue
- * @param next set to the next message that the consumer may browse.
- * @return true if a message is available and next is returned
- */
- virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next ) = 0;
+ virtual bool acquire(const std::string& consumer, Message& target) = 0;
/** hook to add any interesting management state to the status map */
virtual void query(qpid::types::Variant::Map&) const = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.cpp Mon Aug 27 15:40:33 2012
@@ -1,4 +1,4 @@
-/*
+ /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,10 +20,16 @@
*/
#include "qpid/broker/MessageGroupManager.h"
-
-#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/TypeCode.h"
+#include "qpid/types/Variant.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
@@ -75,24 +81,16 @@ void MessageGroupManager::disown( GroupS
freeGroups[state.members.front().position] = &state;
}
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m )
{
- uint32_t thisMsg = qm.position.getValue();
+ uint32_t thisMsg = m.getSequence().getValue();
if (cachedGroup && lastMsg == thisMsg) {
hits++;
return *cachedGroup;
}
- std::string group = defaultGroupId;
- const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (headers) {
- qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
- if (id && id->convertsTo<std::string>()) {
- std::string tmp = id->get<std::string>();
- if (!tmp.empty()) // empty group is reserved
- group = tmp;
- }
- }
+ std::string group = m.getPropertyAsString(groupIdHeader);
+ if (group.empty()) group = defaultGroupId; //empty group is reserved
if (cachedGroup && group == lastGroup) {
hits++;
@@ -112,48 +110,48 @@ MessageGroupManager::GroupState& Message
}
-void MessageGroupManager::enqueued( const QueuedMessage& qm )
+void MessageGroupManager::enqueued( const Message& m )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
- GroupState& state = findGroup(qm);
- GroupState::MessageState mState(qm.position);
+ GroupState& state = findGroup(m);
+ GroupState::MessageState mState(m.getSequence());
state.members.push_back(mState);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.group << " total=" << total );
if (total == 1) {
// newly created group, no owner
- assert(freeGroups.find(qm.position) == freeGroups.end());
- freeGroups[qm.position] = &state;
+ assert(freeGroups.find(m.getSequence()) == freeGroups.end());
+ freeGroups[m.getSequence()] = &state;
}
}
-void MessageGroupManager::acquired( const QueuedMessage& qm )
+void MessageGroupManager::acquired( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- GroupState& state = findGroup(qm);
- GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
- assert(m != state.members.end());
- m->acquired = true;
+ GroupState& state = findGroup(m);
+ GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence());
+ assert(gm != state.members.end());
+ gm->acquired = true;
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.group << " acquired=" << state.acquired );
}
-void MessageGroupManager::requeued( const QueuedMessage& qm )
+void MessageGroupManager::requeued( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- GroupState& state = findGroup(qm);
+ GroupState& state = findGroup(m);
assert( state.acquired != 0 );
state.acquired -= 1;
- GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
- assert(m != state.members.end());
- m->acquired = false;
+ GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+ assert(i != state.members.end());
+ i->acquired = false;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
@@ -164,14 +162,14 @@ void MessageGroupManager::requeued( cons
}
-void MessageGroupManager::dequeued( const QueuedMessage& qm )
+void MessageGroupManager::dequeued( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- GroupState& state = findGroup(qm);
- GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
- assert(m != state.members.end());
- if (m->acquired) {
+ GroupState& state = findGroup(m);
+ GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+ assert(i != state.members.end());
+ if (i->acquired) {
assert( state.acquired != 0 );
state.acquired -= 1;
}
@@ -179,7 +177,7 @@ void MessageGroupManager::dequeued( cons
// special case if qm is first (oldest) message in the group:
// may need to re-insert it back on the freeGroups list, as the index will change
bool reFreeNeeded = false;
- if (m == state.members.begin()) {
+ if (i == state.members.begin()) {
if (!state.owned()) {
// will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
// if on freelist, it is indexed by first member, which is about to be removed!
@@ -188,7 +186,7 @@ void MessageGroupManager::dequeued( cons
}
state.members.pop_front();
} else {
- state.members.erase(m);
+ state.members.erase(i);
}
uint32_t total = state.members.size();
@@ -206,6 +204,12 @@ void MessageGroupManager::dequeued( cons
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
disown(state);
+ MessageDeque* md = dynamic_cast<MessageDeque*>(&messages);
+ if (md) {
+ md->resetCursors();
+ } else {
+ QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type");
+ }
} else if (reFreeNeeded) {
disown(state);
}
@@ -215,55 +219,27 @@ MessageGroupManager::~MessageGroupManage
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
}
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+
+bool MessageGroupManager::acquire(const std::string& consumer, Message& m)
{
- if (!messages.size())
- return false;
+ if (m.getState() == AVAILABLE) {
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ GroupState& state = findGroup(m);
- next.position = c->getPosition();
- if (!freeGroups.empty()) {
- const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
- if (nextFree <= next.position) { // take oldest free
- next.position = nextFree;
- --next.position;
+ if (!state.owned()) {
+ own( state, consumer );
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << consumer << " has acquired group id=" << state.group);
}
- }
-
- while (messages.browse( next.position, next, true )) {
- GroupState& group = findGroup(next);
- if (!group.owned()) {
- //TODO: make acquire more efficient when we already have the message in question
- if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head!
- return true;
- }
- QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front().position);
- } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
+ if (state.owner == consumer) {
+ m.setState(ACQUIRED);
return true;
+ } else {
+ return false;
}
+ } else {
+ return false;
}
- return false;
-}
-
-
-bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
-{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- GroupState& state = findGroup(qm);
-
- if (!state.owned()) {
- own( state, consumer );
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << consumer << " has acquired group id=" << state.group);
- return true;
- }
- return state.owner == consumer;
-}
-
-bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
- // browse: allow access to any available msg, regardless of group ownership (?ok?)
- return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
@@ -296,11 +272,9 @@ void MessageGroupManager::query(qpid::ty
// set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present
info[GROUP_TIMESTAMP] = 0;
if (g->second.members.size() != 0) {
- QueuedMessage qm;
- if (messages.find(g->second.members.front().position, qm) &&
- qm.payload &&
- qm.payload->hasProperties<framing::DeliveryProperties>()) {
- info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
+ Message* m = messages.find(g->second.members.front().position, 0);
+ if (m && m->getTimestamp()) {
+ info[GROUP_TIMESTAMP] = m->getTimestamp();
}
}
info[GROUP_CONSUMER] = g->second.owner;
@@ -313,33 +287,13 @@ void MessageGroupManager::query(qpid::ty
boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
Messages& messages,
- const qpid::framing::FieldTable& settings )
+ const QueueSettings& settings )
{
- boost::shared_ptr<MessageGroupManager> empty;
-
- if (settings.isSet(qpidMessageGroupKey)) {
-
- // @todo: remove once "sticky" consumers are supported - see QPID-3347
- if (!settings.isSet(qpidSharedGroup)) {
- QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
- return empty;
- }
-
- std::string headerKey = settings.getAsString(qpidMessageGroupKey);
- if (headerKey.empty()) {
- QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
- return empty;
- }
- unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
-
- boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
-
- QPID_LOG( debug, "Configured Queue '" << qName <<
- "' for message grouping using header key '" << headerKey << "'" <<
- " (timestamp=" << timestamp << ")");
- return manager;
- }
- return empty;
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) );
+ QPID_LOG( debug, "Configured Queue '" << qName <<
+ "' for message grouping using header key '" << settings.groupKey << "'" <<
+ " (timestamp=" << settings.addTimestamp << ")");
+ return manager;
}
std::string MessageGroupManager::defaultGroupId;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageGroupManager.h Mon Aug 27 15:40:33 2012
@@ -24,8 +24,10 @@
/* for managing message grouping on Queues */
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/MessageDistributor.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/unordered_map.h"
#include <deque>
@@ -34,6 +36,7 @@ namespace qpid {
namespace broker {
class QueueObserver;
+struct QueueSettings;
class MessageDistributor;
class Messages;
@@ -76,11 +79,7 @@ class MessageGroupManager : public State
GroupFifo freeGroups; // ordered by oldest free msg
//Consumers consumers; // index: consumer name
- static const std::string qpidMessageGroupKey;
- static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
- static const std::string qpidMessageGroupTimestamp;
-
- GroupState& findGroup( const QueuedMessage& qm );
+ GroupState& findGroup( const Message& m );
unsigned long hits, misses; // for debug
uint32_t lastMsg;
std::string lastGroup;
@@ -91,11 +90,14 @@ class MessageGroupManager : public State
void disown( GroupState& state );
public:
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
+ static const std::string qpidMessageGroupTimestamp;
static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
Messages& messages,
- const qpid::framing::FieldTable& settings );
+ const QueueSettings& settings );
MessageGroupManager(const std::string& header, const std::string& _qName,
Messages& container, unsigned int _timestamp=0 )
@@ -106,22 +108,20 @@ class MessageGroupManager : public State
virtual ~MessageGroupManager();
// QueueObserver iface
- void enqueued( const QueuedMessage& qm );
- void acquired( const QueuedMessage& qm );
- void requeued( const QueuedMessage& qm );
- void dequeued( const QueuedMessage& qm );
+ void enqueued( const Message& qm );
+ void acquired( const Message& qm );
+ void requeued( const Message& qm );
+ void dequeued( const Message& qm );
void consumerAdded( const Consumer& ) {};
void consumerRemoved( const Consumer& ) {};
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);
// MessageDistributor iface
- bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
- bool allocate(const std::string& c, const QueuedMessage& qm);
- bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ bool acquire(const std::string& c, Message& );
void query(qpid::types::Variant::Map&) const;
- bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+ bool match(const qpid::types::Variant::Map*, const Message&) const;
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp Mon Aug 27 15:40:33 2012
@@ -19,7 +19,8 @@
*
*/
#include "qpid/broker/MessageMap.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/log/Statement.h"
#include <algorithm>
@@ -29,29 +30,17 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(message.position);
- if (i != messages.end()) {
- erase(i);
- return true;
- } else {
- return false;
- }
-}
-std::string MessageMap::getKey(const QueuedMessage& message)
+std::string MessageMap::getKey(const Message& message)
{
- const framing::FieldTable* ft = message.payload->getApplicationHeaders();
- if (ft) return ft->getAsString(key);
- else return EMPTY;
+ return message.getPropertyAsString(key);
}
size_t MessageMap::size()
{
size_t count(0);
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ if (i->second.getState() == AVAILABLE) ++count;
}
return count;
}
@@ -61,116 +50,103 @@ bool MessageMap::empty()
return size() == 0;//TODO: more efficient implementation
}
-void MessageMap::release(const QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(message.position);
- if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
- i->second.status = QueuedMessage::AVAILABLE;
- }
-}
-
-bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::deleted(const QueueCursor& cursor)
{
- Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
+ Ordering::iterator i = messages.find(cursor.position);
+ if (i != messages.end()) {
+ erase(i);
return true;
} else {
return false;
}
}
-bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* MessageMap::find(const QueueCursor& cursor)
{
- Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ if (cursor.valid) return find(cursor.position, 0);
+ else return 0;
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
- Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
- message = i->second;
- return true;
+ Ordering::iterator i = messages.lower_bound(position);
+ if (i != messages.end()) {
+ if (cursor) cursor->setPosition(i->first, version);
+ if (i->first == position) return &(i->second);
+ else return 0;
} else {
- return false;
+ //there is no message whose sequence is greater than position,
+ //i.e. haven't got there yet
+ if (cursor) cursor->setPosition(position, version);
+ return 0;
}
}
-bool MessageMap::consume(QueuedMessage& message)
+Message* MessageMap::next(QueueCursor& cursor)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
- return true;
+ Ordering::iterator i;
+ if (!cursor.valid) i = messages.begin(); //start with oldest message
+ else i = messages.upper_bound(cursor.position); //get first message that is greater than position
+
+ while (i != messages.end()) {
+ Message& m = i->second;
+ cursor.setPosition(m.getSequence(), version);
+ if (cursor.check(m)) {
+ return &m;
+ } else {
+ ++i;
}
}
- return false;
+ return 0;
}
-const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+const Message& MessageMap::replace(const Message& original, const Message& update)
{
- messages.erase(original.position);
- messages[update.position] = update;
- return update;
+ messages.erase(original.getSequence());
+ std::pair<Ordering::iterator, bool> i = messages.insert(Ordering::value_type(update.getSequence(), update));
+ i.first->second.setState(AVAILABLE);
+ return i.first->second;
}
-bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+void MessageMap::publish(const Message& added)
+{
+ Message dummy;
+ update(added, dummy);
+}
+
+bool MessageMap::update(const Message& added, Message& removed)
{
std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- QueuedMessage& a = messages[added.position];
- a = added;
- a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message " << a);
+ messages.insert(Ordering::value_type(added.getSequence(), added)).first->second.setState(AVAILABLE);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
- result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
+ result.first->second.setState(AVAILABLE);
+ QPID_LOG(debug, "Displaced message at " << removed.getSequence() << " with " << result.first->second.getSequence() << ": " << result.first->first);
return true;
}
}
-void MessageMap::setPosition(const framing::SequenceNumber& seq) {
- // Nothing to do, just assert that the precondition is respected and there
- // are no undeleted messages after seq.
- (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
-}
-
-void MessageMap::foreach(Functor f)
+Message* MessageMap::release(const QueueCursor& cursor)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
+ Ordering::iterator i = messages.find(cursor.position);
+ if (i != messages.end()) {
+ i->second.setState(AVAILABLE);
+ return &i->second;
+ } else {
+ return 0;
}
}
-void MessageMap::removeIf(Predicate p)
+void MessageMap::foreach(Functor f)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end();) {
- if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
- index.erase(getKey(i->second));
- //Note: Removing from messages means that the subsequent
- //call to deleted() for the same message will return
- //false. At present that is not a problem. If this were
- //changed to hold onto the message until dequeued
- //(e.g. with REMOVED state), then the erase() below would
- //need to take that into account.
- messages.erase(i++);
- } else {
- ++i;
- }
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.getState() == AVAILABLE) f(i->second);
}
}
@@ -180,6 +156,6 @@ void MessageMap::erase(Ordering::iterato
messages.erase(i);
}
-MessageMap::MessageMap(const std::string& k) : key(k) {}
+MessageMap::MessageMap(const std::string& k) : key(k), version(0) {}
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h Mon Aug 27 15:40:33 2012
@@ -6,7 +6,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
-o * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -22,6 +22,7 @@ o * regarding copyright ownership. The
*
*/
#include "qpid/broker/Messages.h"
+#include "qpid/broker/Message.h"
#include "qpid/framing/SequenceNumber.h"
#include <map>
#include <string>
@@ -38,32 +39,31 @@ class MessageMap : public Messages
{
public:
MessageMap(const std::string& key);
- virtual ~MessageMap() {}
size_t size();
bool empty();
- virtual bool deleted(const QueuedMessage&);
- void release(const QueuedMessage&);
- virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
- bool find(const framing::SequenceNumber&, QueuedMessage&);
- virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
- virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
- void setPosition(const framing::SequenceNumber&);
+ bool deleted(const QueueCursor&);
+ void publish(const Message& added);//use update instead to get replaced message
+ Message* next(QueueCursor&);
+ Message* release(const QueueCursor& cursor);
+ Message* find(const QueueCursor&);
+ Message* find(const framing::SequenceNumber&, QueueCursor*);
void foreach(Functor);
- virtual void removeIf(Predicate);
+
+ bool update(const Message& added, Message& removed);
protected:
- typedef std::map<std::string, QueuedMessage> Index;
- typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+ typedef std::map<std::string, Message> Index;
+ typedef std::map<framing::SequenceNumber, Message> Ordering;
const std::string key;
Index index;
Ordering messages;
+ int32_t version;
- std::string getKey(const QueuedMessage&);
- virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+ std::string getKey(const Message&);
+ virtual const Message& replace(const Message&, const Message&);
void erase(Ordering::iterator);
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h Mon Aug 27 15:40:33 2012
@@ -29,7 +29,8 @@ namespace framing {
class SequenceNumber;
}
namespace broker {
-struct QueuedMessage;
+class Message;
+class QueueCursor;
/**
* This interface abstracts out the access to the messages held for
@@ -39,8 +40,7 @@ struct QueuedMessage;
class Messages
{
public:
- typedef boost::function1<void, QueuedMessage&> Functor;
- typedef boost::function1<bool, QueuedMessage&> Predicate;
+ typedef boost::function1<void, Message&> Functor;
virtual ~Messages() {}
/**
@@ -51,47 +51,44 @@ class Messages
/**
* Called when a message is deleted from the queue.
*/
- virtual bool deleted(const QueuedMessage&) = 0;
+ virtual bool deleted(const QueueCursor&) = 0;
/**
- * Releases an acquired message, making it available again.
+ * Makes a message available.
*/
- virtual void release(const QueuedMessage&) = 0;
+ virtual void publish(const Message& added) = 0;
/**
- * Acquire the message at the specified position, returning true
- * if found, false otherwise. The acquired message is passed back
- * via the second parameter.
- */
- virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0;
- /**
- * Find the message at the specified position, returning true if
- * found, false otherwise. The matched message is passed back via
- * the second parameter.
+ * Retrieve the next message for the given cursor. A reference to
+ * the message is passed back via the second parameter.
+ *
+ * @return a pointer to the message if there is one, in which case
+ * the cursor that points to it is assigned to cursor; null
+ * otherwise.
*/
- virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ virtual Message* next(QueueCursor& cursor) = 0;
+
/**
- * Retrieve the next message to be given to a browsing
- * subscription that has reached the specified position. The next
- * message is passed back via the second parameter.
+ * Release the message i.e. return it to the available state
+ * unless it has already been deleted.
*
- * @param unacquired, if true, will only browse unacquired messages
- *
- * @return true if there is another message, false otherwise.
+ * @return a pointer to the Message if it is still in acquired state and
+ * hence can be released; null if it has already been deleted
*/
- virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0;
+ virtual Message* release(const QueueCursor& cursor) = 0;
/**
- * Retrieve the next message available for a consuming
- * subscription.
- *
- * @return true if there is such a message, false otherwise.
+ * Find the message with the specified sequence number, returning
+ * a pointer if found, null otherwise. A cursor to the matched
+ * message can be passed back via the second parameter, regardless
+ * of whether the message is found, using this cursor to call
+ * next() will give the next message greater than position if one
+ * exists.
*/
- virtual bool consume(QueuedMessage&) = 0;
+ virtual Message* find(const framing::SequenceNumber&, QueueCursor*) = 0;
+
/**
- * Pushes a message to the back of the 'queue'. For some types of
- * queue this may cause another message to be removed; if that is
- * the case the method will return true and the removed message
- * will be passed out via the second parameter.
+ * Find the message at the specified position, returning a pointer if
+ * found, null otherwise.
*/
- virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+ virtual Message* find(const QueueCursor&) = 0;
/**
* Add an already acquired message to the queue.
@@ -99,25 +96,11 @@ class Messages
* Only need be implemented by subclasses that keep track of
* acquired messages.
*/
- virtual void updateAcquired(const QueuedMessage&) { }
-
- /**
- * Set the position of the back of the queue. Next message enqueued will be n+1.
- *@pre Any messages with seq > n must already be dequeued.
- */
- virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
-
+ //virtual void updateAcquired(const QueuedMessage&) { }
/**
* Apply, the functor to each message held
*/
-
virtual void foreach(Functor) = 0;
- /**
- * Remove every message held that for which the specified
- * predicate returns true
- */
- virtual void removeIf(Predicate) = 0;
-
private:
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Persistable.h Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ namespace broker {
/**
* Base class for all persistable objects
*/
-class Persistable : public RefCounted
+class Persistable : public virtual RefCounted
{
public:
/**
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp Mon Aug 27 15:40:33 2012
@@ -30,163 +30,58 @@ using namespace qpid::broker;
namespace qpid {
namespace broker {
+PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {}
PersistableMessage::~PersistableMessage() {}
-PersistableMessage::PersistableMessage() :
- asyncDequeueCounter(0),
- store(0),
- asyncStore(0)
-{}
-
-void PersistableMessage::flush()
+void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i)
{
- syncList copy;
- {
- sys::ScopedLock<sys::Mutex> l(storeLock);
- if (store) {
- copy = synclist;
- } else {
- return;//early exit as nothing to do
- }
+ ingressCompletion = i.get();
+ /**
+ * What follows is a hack to account for the fact that the
+ * AsyncCompletion to use may be, but is not always, this same
+ * object.
+ *
+ * This is hopefully temporary, and allows the store interface to
+ * remain unchanged without requiring another object to be allocated
+ * for every message.
+ *
+ * The case in question is where a message previously passed to
+ * the store is modified by some other queue onto which it is
+ * pushed, and then again persisted to the store. These will be
+ * two separate PersistableMessage instances (since the latter now
+ * has different content), but need to share the same
+ * AsyncCompletion (since they refer to the same incoming transfer
+ * command).
+ */
+ if (static_cast<RefCounted*>(ingressCompletion) != static_cast<RefCounted*>(this)) {
+ holder = i;
}
- for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
- PersistableQueue::shared_ptr q(i->lock());
- if (q) {
- q->flush();
- }
- }
-}
-
-void PersistableMessage::setContentReleased()
-{
- contentReleaseState.released = true;
-}
-
-bool PersistableMessage::isContentReleased() const
-{
- return contentReleaseState.released;
-}
-
-
-bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
- if (store && (queue->getPersistenceId()!=0)) {
- for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- PersistableQueue::shared_ptr q(i->lock());
- if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
- }
- }
- return false;
}
-// deprecated
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
- if (_store){
- sys::ScopedLock<sys::Mutex> l(storeLock);
- store = _store;
- boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
- }
-}
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
- if (_store){
- sys::ScopedLock<sys::Mutex> l(storeLock);
- asyncStore = _store;
- boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
- }
+void PersistableMessage::flush()
+{
+ //TODO: is this really the right place for this?
}
// deprecated
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
- addToSyncList(queue, _store);
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+{
enqueueStart();
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
- addToSyncList(queue, _store);
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*)
+{
enqueueStart();
}
-bool PersistableMessage::isDequeueComplete() {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- return asyncDequeueCounter == 0;
-}
-
-void PersistableMessage::dequeueComplete() {
- bool notify = false;
- {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- if (asyncDequeueCounter > 0) {
- if (--asyncDequeueCounter == 0) {
- notify = true;
- }
- }
- }
- if (notify) allDequeuesComplete();
-}
-
// deprecated
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
- if (_store){
- sys::ScopedLock<sys::Mutex> l(storeLock);
- store = _store;
- boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
- }
- dequeueAsync();
-}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
- if (_store){
- sys::ScopedLock<sys::Mutex> l(storeLock);
- asyncStore = _store;
- boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
- }
- dequeueAsync();
-}
-
-void PersistableMessage::dequeueAsync() {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- asyncDequeueCounter++;
-}
-
-PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
-
-// deprecated
-void PersistableMessage::setStore(MessageStore* s)
-{
- store = s;
-}
-
-void PersistableMessage::setStore(AsyncStore* s)
-{
- asyncStore = s;
-}
-
-void PersistableMessage::requestContentRelease()
-{
- contentReleaseState.requested = true;
-}
-void PersistableMessage::blockContentRelease()
-{
- contentReleaseState.blocked = true;
-}
-bool PersistableMessage::checkContentReleasable()
-{
- return contentReleaseState.requested && !contentReleaseState.blocked;
-}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {}
-bool PersistableMessage::isContentReleaseBlocked()
-{
- return contentReleaseState.blocked;
-}
-
-bool PersistableMessage::isContentReleaseRequested()
-{
- return contentReleaseState.requested;
-}
+bool PersistableMessage::isDequeueComplete() { return false; }
+void PersistableMessage::dequeueComplete() {}
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h Mon Aug 27 15:40:33 2012
@@ -24,30 +24,32 @@
#include <string>
#include <list>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <map>
+#include <boost/intrusive_ptr.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Persistable.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/amqp_framing.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/MessageHandle.h"
namespace qpid {
+namespace types {
+class Variant;
+}
namespace broker {
class MessageStore;
class AsyncStore;
+class Queue;
/**
* Base class for persistable messages.
*/
class PersistableMessage : public Persistable
{
- typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Mutex asyncDequeueLock;
- sys::Mutex storeLock;
-
/**
* "Ingress" messages == messages sent _to_ the broker.
* Tracks the number of outstanding asynchronous operations that must
@@ -57,72 +59,26 @@ class PersistableMessage : public Persis
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- AsyncCompletion ingressCompletion;
-
- /**
- * Tracks the number of outstanding asynchronous dequeue
- * operations. When the message is dequeued asynchronously the
- * count is incremented; when that dequeue completes it is
- * decremented. Thus when it is 0, there are no outstanding
- * dequeues.
- */
- int asyncDequeueCounter;
-
- void dequeueAsync();
-
- syncList synclist;
- struct ContentReleaseState
- {
- bool blocked;
- bool requested;
- bool released;
-
- ContentReleaseState();
- };
- ContentReleaseState contentReleaseState;
-
- protected:
- /** Called when all dequeues are complete for this message. */
- virtual void allDequeuesComplete() = 0;
-
- void setContentReleased();
-
- MessageStore* store; // deprecated, use AsyncStore
- AsyncStore* asyncStore; // new AsyncStore interface
-
+ AsyncCompletion* ingressCompletion;
+ boost::intrusive_ptr<AsyncCompletion> holder;
+ mutable uint64_t persistenceId;
+ MessageHandle msgHandle;
public:
- typedef boost::shared_ptr<PersistableMessage> shared_ptr;
-
- /**
- * @returns the size of the headers when encoded
- */
- virtual uint32_t encodedHeaderSize() const = 0;
-
- virtual ~PersistableMessage();
-
PersistableMessage();
+ virtual ~PersistableMessage();
void flush();
-
- QPID_BROKER_EXTERN bool isContentReleased() const;
-
- QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated
- QPID_BROKER_EXTERN void setStore(AsyncStore*);
- void requestContentRelease();
- void blockContentRelease();
- bool checkContentReleasable();
- bool isContentReleaseBlocked();
- bool isContentReleaseRequested();
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
/** track the progress of a message received by the broker - see ingressCompletion above */
- QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
- QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
+ QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
+ QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
+ QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
- QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
- QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
+ QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
+ QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
@@ -131,18 +87,23 @@ class PersistableMessage : public Persis
QPID_BROKER_EXTERN bool isDequeueComplete();
-
QPID_BROKER_EXTERN void dequeueComplete();
-
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
AsyncStore* _store);
- bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
-
- void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated
- void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store);
+ uint64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+ MessageHandle& getMessageHandle() { return msgHandle; }
+ const MessageHandle& getMessagehandle() const { return msgHandle; }
+
+
+ virtual void decodeHeader(framing::Buffer& buffer) = 0;
+ virtual void decodeContent(framing::Buffer& buffer) = 0;
+ virtual uint32_t encodedHeaderSize() const = 0;
+ virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0;
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp Mon Aug 27 15:40:33 2012
@@ -19,24 +19,53 @@
*
*/
#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
#include <cmath>
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
+namespace {
+class PriorityContext : public CursorContext {
+ public:
+ std::vector<QueueCursor> position;
+ PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
+};
+}
+
PriorityQueue::PriorityQueue(int l) :
levels(l),
- messages(levels, Deque()),
- frontLevel(0), haveFront(false), cached(false) {}
+ messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
+ counters(levels, framing::SequenceNumber()),
+ fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
+ frontLevel(0), haveFront(false), cached(false)
+{
+}
-bool PriorityQueue::deleted(const QueuedMessage& qm) {
- bool deleted = fifo.deleted(qm);
- if (deleted) erase(qm);
- return deleted;
+bool PriorityQueue::deleted(const QueueCursor& c)
+{
+ MessagePointer* ptr = fifo.find(c);
+ if (ptr && ptr->holder) {
+ //mark the message as deleted
+ ptr->holder->message.setState(DELETED);
+ //clean the deque for the relevant priority level
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
+ messages[ptr->holder->priority].clean();
+ //stop referencing that message holder (it may now have been
+ //deleted)
+ ptr->holder = 0;
+ //clean fifo index
+ fifo.clean();
+ return true;
+ } else {
+ return false;
+ }
}
size_t PriorityQueue::size()
@@ -44,85 +73,69 @@ size_t PriorityQueue::size()
return fifo.size();
}
-namespace {
-bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
-}
-
-void PriorityQueue::release(const QueuedMessage& message)
+Message* PriorityQueue::next(QueueCursor& cursor)
{
- QueuedMessage* qm = fifo.releasePtr(message);
- if (qm) {
- uint p = getPriorityLevel(message);
- messages[p].insert(
- lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
- clearCache();
- }
-}
-
-
-void PriorityQueue::erase(const QueuedMessage& qm) {
- size_t i = getPriorityLevel(qm);
- if (!messages[i].empty()) {
- long diff = qm.position.getValue() - messages[i].front()->position.getValue();
- if (diff < 0) return;
- long maxEnd = std::min(size_t(diff), messages[i].size());
- QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
- Deque::iterator l =
- lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
- if (l != messages[i].end() && (*l)->position == qm.position) {
- messages[i].erase(l);
- clearCache();
- return;
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
+ if (!ctxt) {
+ ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+ cursor.context = ctxt;
+ }
+ if (cursor.type == REPLICATOR) {
+ //browse in fifo order
+ MessagePointer* ptr = fifo.next(cursor);
+ return ptr ? &(ptr->holder->message) : 0;
+ } else if (cursor.type == PURGE) {
+ //iterate over message in reverse priority order (i.e. purge lowest priority message first)
+ //ignore any fairshare configuration here as well
+ for (int p = 0; p < levels; ++p) {
+ MessageHolder* holder = messages[p].next(ctxt->position[p]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
}
+ return 0;
+ } else {
+ //check each level in turn, in priority order, for any more messages
+ Priority p = firstLevel();
+ do {
+ MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
+ } while (nextLevel(p));
+ return 0;
}
}
-bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const QueueCursor& cursor)
{
- bool acquired = fifo.acquire(position, message);
- if (acquired) erase(message); // No longer available
- return acquired;
+ return find(cursor.position, 0);
}
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
- return fifo.find(position, message);
+ MessagePointer* ptr = fifo.find(position, cursor);
+ return ptr ? &(ptr->holder->message) : 0;
}
-bool PriorityQueue::browse(
- const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+void PriorityQueue::publish(const Message& published)
{
- return fifo.browse(position, message, unacquired);
+ MessageHolder holder;
+ holder.message = published;
+ holder.priority = getPriorityLevel(published);
+ holder.id = ++(counters[holder.priority]);
+ MessagePointer pointer;
+ pointer.holder = &(messages[holder.priority].publish(holder));
+ pointer.id = published.getSequence();
+ fifo.publish(pointer);
}
-bool PriorityQueue::consume(QueuedMessage& message)
+Message* PriorityQueue::release(const QueueCursor& cursor)
{
- if (checkFront()) {
- QueuedMessage* pm = messages[frontLevel].front();
- messages[frontLevel].pop_front();
- clearCache();
- pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
- message = *pm;
- return true;
- } else {
- return false;
- }
-}
-
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
- QueuedMessage* qmp = fifo.pushPtr(added);
- messages[getPriorityLevel(added)].push_back(qmp);
- clearCache();
- return false; // Adding a message never causes one to be removed for deque
-}
-
-void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
- fifo.updateAcquired(acquired);
-}
-
-void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
- fifo.setPosition(n);
+ MessagePointer* ptr = fifo.release(cursor);
+ return ptr ? &(ptr->holder->message) : 0;
}
void PriorityQueue::foreach(Functor f)
@@ -130,62 +143,87 @@ void PriorityQueue::foreach(Functor f)
fifo.foreach(f);
}
-void PriorityQueue::removeIf(Predicate p)
-{
- for (int priority = 0; priority < levels; ++priority) {
- for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
- if (p(**i)) {
- (*i)->status = QueuedMessage::DELETED; // Updates fifo index
- i = messages[priority].erase(i);
- clearCache();
- } else {
- ++i;
- }
- }
- }
- fifo.clean();
-}
-
-uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+uint PriorityQueue::getPriorityLevel(const Message& m) const
{
- uint priority = m.payload->getPriority();
+ uint priority = m.getPriority();
//Use AMQP 0-10 approach to mapping priorities to a fixed level
//(see rule priority-level-implementation)
const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
if (priority <= firstLevel) return 0;
return std::min(priority - firstLevel, (uint)levels-1);
}
+PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
+{
+ PriorityQueue::MessagePointer pointer;
+ pointer.holder = 0;
+ pointer.id = id;
+ return pointer;
+}
-void PriorityQueue::clearCache()
+PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
{
- cached = false;
+ PriorityQueue::MessageHolder holder;
+ holder.id = id;
+ holder.message.setState(DELETED);
+ return holder;
}
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+PriorityQueue::Priority PriorityQueue::firstLevel()
{
- for (int p = levels-1; p >= 0; --p) {
- if (!m[p].empty()) {
- l = p;
- return true;
- }
+ return Priority(levels - 1);
+}
+bool PriorityQueue::nextLevel(Priority& p)
+{
+ if (p.current > 0) {
+ --(p.current);
+ return true;
+ } else {
+ return false;
}
- return false;
}
-bool PriorityQueue::checkFront()
+framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
+{
+ return id;
+}
+void PriorityQueue::MessageHolder::setState(MessageState s)
{
- if (!cached) {
- haveFront = findFrontLevel(frontLevel, messages);
- cached = true;
+ message.setState(s);
+}
+MessageState PriorityQueue::MessageHolder::getState() const
+{
+ return message.getState();
+}
+PriorityQueue::MessageHolder::operator Message&()
+{
+ return message;
+}
+framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
+{
+ if (holder) {
+ return holder->message.getSequence();
+ } else {
+ //this is used when the instance is merely acting as padding
+ return id;
}
- return haveFront;
}
-
-uint PriorityQueue::getPriority(const QueuedMessage& message)
+void PriorityQueue::MessagePointer::setState(MessageState s)
{
- const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
- if (queue) return queue->getPriorityLevel(message);
- else return 0;
+ if (holder) {
+ holder->message.setState(s);
+ }
+}
+MessageState PriorityQueue::MessagePointer::getState() const
+{
+ if (holder) {
+ return holder->message.getState();
+ } else {
+ return DELETED;
+ }
+}
+PriorityQueue::MessagePointer::operator Message&()
+{
+ assert(holder);
+ return holder->message;
}
-
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h Mon Aug 27 15:40:33 2012
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/IndexedDeque.h"
#include "qpid/sys/IntegerTypes.h"
#include <deque>
#include <vector>
@@ -44,42 +45,63 @@ class PriorityQueue : public Messages
virtual ~PriorityQueue() {}
size_t size();
- bool deleted(const QueuedMessage&);
- void release(const QueuedMessage&);
- bool acquire(const framing::SequenceNumber&, QueuedMessage&);
- bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- void updateAcquired(const QueuedMessage& acquired);
- void setPosition(const framing::SequenceNumber&);
- void foreach(Functor);
- void removeIf(Predicate);
-
- static uint getPriority(const QueuedMessage&);
+ bool deleted(const QueueCursor&);
+ void publish(const Message& added);
+ Message* next(QueueCursor&);
+ Message* release(const QueueCursor& cursor);
+ Message* find(const QueueCursor&);
+ Message* find(const framing::SequenceNumber&, QueueCursor*);
+ void foreach(Functor);
+ static uint getPriority(const Message&);
protected:
- typedef std::deque<QueuedMessage*> Deque;
- typedef std::vector<Deque> PriorityLevels;
- virtual bool findFrontLevel(uint& p, PriorityLevels&);
-
const int levels;
+ struct Priority
+ {
+ const int start;
+ int current;
+ Priority(int s) : start(s), current(start) {}
+ };
+ virtual Priority firstLevel();
+ virtual bool nextLevel(Priority& );
private:
- /** Available messages separated by priority and sorted in priority order.
- * Holds pointers to the QueuedMessages in fifo
+ struct MessageHolder
+ {
+ Message message;
+ int priority;
+ framing::SequenceNumber id;
+ framing::SequenceNumber getSequence() const;
+ void setState(MessageState);
+ MessageState getState() const;
+ operator Message&();
+ };
+ struct MessagePointer
+ {
+ MessageHolder* holder;
+ framing::SequenceNumber id;//used only for padding
+ framing::SequenceNumber getSequence() const;
+ void setState(MessageState);
+ MessageState getState() const;
+ operator Message&();
+ };
+ typedef IndexedDeque<MessageHolder> Deque;
+ typedef std::vector<Deque> PriorityLevels;
+ typedef std::vector<framing::SequenceNumber> Counters;
+
+ /** Holds pointers to messages (stored in the fifo index) separated by priority.
*/
PriorityLevels messages;
- /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */
- MessageDeque fifo;
+ Counters counters;
+ /** FIFO index of messages for fast browsing and indexing */
+ IndexedDeque<MessagePointer> fifo;
uint frontLevel;
bool haveFront;
bool cached;
- void erase(const QueuedMessage&);
- uint getPriorityLevel(const QueuedMessage&) const;
- void clearCache();
- bool checkFront();
+ uint getPriorityLevel(const Message&) const;
+ MessageHolder priorityPadding(qpid::framing::SequenceNumber);
+ MessagePointer fifoPadding(qpid::framing::SequenceNumber);
};
}} // namespace qpid::broker
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org