You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC
svn commit: r1371676 [4/8] - in /qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/
cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/
cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Aug 10 12:04:27 2012
@@ -20,23 +20,23 @@
*/
#include "qpid/broker/Queue.h"
-
#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/broker/QueueDepth.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/LegacyLVQ.h"
-#include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageDistributor.h"
+#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueFlowLimit.h"
-#include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/MessageGroupManager.h"
+//TODO: get rid of this
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
@@ -76,26 +76,8 @@ namespace _qmf = qmf::org::apache::qpid:
namespace
{
-const std::string qpidMaxSize("qpid.max_size");
-const std::string qpidMaxCount("qpid.max_count");
-const std::string qpidNoLocal("no-local");
-const std::string qpidTraceIdentity("qpid.trace.id");
-const std::string qpidTraceExclude("qpid.trace.exclude");
-const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
-const std::string qpidLastValueQueue("qpid.last_value_queue");
-const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
-const std::string qpidPersistLastNode("qpid.persist_last_node");
-const std::string qpidVQMatchProperty("qpid.LVQ_key");
-const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
-const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
-//following feature is not ready for general use as it doesn't handle
-//the case where a message is enqueued on more than one queue well enough:
-const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
-
-const int ENQUEUE_ONLY=1;
-const int ENQUEUE_AND_DEQUEUE=2;
-inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+inline void mgntEnqStats(const Message& msg,
_qmf::Queue* mgmtObject,
_qmf::Broker* brokerMgmtObject)
{
@@ -103,12 +85,12 @@ inline void mgntEnqStats(const boost::in
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg->contentSize();
+ uint64_t contentSize = msg.getContentSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
bStats->byteTotalEnqueues += contentSize;
- if (msg->isPersistent ()) {
+ if (msg.isPersistent ()) {
qStats->msgPersistEnqueues += 1;
bStats->msgPersistEnqueues += 1;
qStats->bytePersistEnqueues += contentSize;
@@ -119,20 +101,20 @@ inline void mgntEnqStats(const boost::in
}
}
-inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+inline void mgntDeqStats(const Message& msg,
_qmf::Queue* mgmtObject,
_qmf::Broker* brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg->contentSize();
+ uint64_t contentSize = msg.getContentSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
qStats->byteTotalDequeues += contentSize;
bStats->byteTotalDequeues += contentSize;
- if (msg->isPersistent ()){
+ if (msg.isPersistent ()){
qStats->msgPersistDequeues += 1;
bStats->msgPersistDequeues += 1;
qStats->bytePersistDequeues += contentSize;
@@ -143,43 +125,81 @@ inline void mgntDeqStats(const boost::in
}
}
-} // namespace
+QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
+{
+ QueueSettings settings(inputs);
+ if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
+ settings.maxDepth.setSize(globalOptions.queueLimit);
+ }
+ return settings;
+}
+
+}
+
+Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
+bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
+{
+ try {
+ prepared = queue->enqueue(ctxt, message);
+ return true;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to prepare: " << e.what());
+ return false;
+ }
+}
+void Queue::TxPublish::commit() throw()
+{
+ try {
+ if (prepared) queue->process(message);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ }
+}
+void Queue::TxPublish::rollback() throw()
+{
+ try {
+ if (prepared) queue->enqueueAborted(message);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to rollback: " << e.what());
+ }
+}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, const QueueSettings& _settings,
MessageStore* const _store,
- const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
name(_name),
- autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(0),
consumerCount(0),
browserCount(0),
exclusive(0),
- noLocal(false),
persistLastNode(false),
inLastNodeFailure(false),
messages(new MessageDeque()),
persistenceId(0),
- policyExceeded(false),
+ settings(b ? merge(_settings, b->getOptions()) : _settings),
mgmtObject(0),
brokerMgmtObject(0),
eventMode(0),
- insertSeqNo(0),
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0),
allocator(new FifoDistributor( *messages ))
{
+ if (settings.maxDepth.hasCount()) current.setCount(0);
+ if (settings.maxDepth.hasSize()) current.setSize(0);
+ if (settings.traceExcludes.size()) {
+ split(traceExclude, settings.traceExcludes, ", ");
+ }
+ qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
- mgmtObject->set_exclusive(_owner != 0);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
@@ -197,32 +217,36 @@ Queue::~Queue()
}
}
-bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+bool isLocalTo(const OwnershipToken* token, const Message& msg)
{
- return token && token->isLocal(msg->getPublisher());
+ return token && token->isLocal(msg.getPublisher());
}
-bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
- return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+ return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
-bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+bool Queue::isExcluded(const Message& msg)
{
- return traceExclude.size() && msg->isExcluded(traceExclude);
+ return traceExclude.size() && msg.isExcluded(traceExclude);
}
-void Queue::deliver(boost::intrusive_ptr<Message> msg){
+void Queue::deliver(Message msg, TxBuffer* txn){
+ //TODO: move some of this out of the queue and into the publishing
+ //'link' for whatever protocol is used; that would let protocol
+ //specific stuff be kept out the queue
+
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
- if (msg->isImmediate() && getConsumerCount() == 0) {
+ if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
- DeliverableMessage deliverable(msg);
+ DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
@@ -232,47 +256,38 @@ void Queue::deliver(boost::intrusive_ptr
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- enqueue(0, msg);
- push(msg);
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+ if (txn) {
+ TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
+ txn->enlist(op);
+ } else {
+ if (enqueue(0, msg)) {
+ push(msg);
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+ } else {
+ QPID_LOG(debug, "Message " << msg << " dropped from " << name);
+ }
+ }
}
}
-void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
+void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->recoverEnqueued(msg);
+ current += QueueDepth(1, msg.getContentSize());
}
-void Queue::recover(boost::intrusive_ptr<Message>& msg)
+void Queue::recover(Message& msg)
{
- {
- Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->recoverEnqueued(msg);
- }
-
+ recoverPrepared(msg);
push(msg, true);
- if (store){
- // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
- }
-
- if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
- //content has not been loaded, need to ensure that lazy loading mode is set:
- //TODO: find a nicer way to do this
- msg->releaseContent(store);
- // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the
- // presence of this message). Do not change this without also checking these tests.
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery");
- }
}
-void Queue::process(boost::intrusive_ptr<Message>& msg){
+void Queue::process(Message& msg)
+{
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg->contentSize();
+ const uint64_t contentSize = msg.getContentSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -285,46 +300,22 @@ void Queue::process(boost::intrusive_ptr
}
}
-void Queue::requeue(const QueuedMessage& msg){
+void Queue::release(const QueueCursor& position, bool markRedelivered)
+{
assertClusterSafe();
QueueListeners::NotificationSet copy;
{
- if (!isEnqueued(msg)) return;
- if (deleted) {
- //
- // If the queue has been deleted, requeued messages must be sent to the alternate exchange
- // if one is configured.
- //
- if (alternateExchange.get()) {
- DeliverableMessage dmsg(msg.payload);
- alternateExchange->routeWithAlternate(dmsg);
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- } else {
- {
- Mutex::ScopedLock locker(messageLock);
- messages->release(msg);
- observeRequeue(msg, locker);
+ Mutex::ScopedLock locker(messageLock);
+ if (!deleted) {
+ Message* message = messages->release(position);
+ if (message) {
+ if (!markRedelivered) message->undeliver();
listeners.populate(copy);
- }
-
- if (mgmtObject) {
- mgmtObject->inc_releases();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_releases();
- }
-
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
- if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
- msg.payload->forcePersistent();
- if (msg.payload->isForcedPersistent() ){
- boost::intrusive_ptr<Message> payload = msg.payload;
- enqueue(0, payload);
+ observeRequeue(*message, locker);
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
}
}
}
@@ -332,163 +323,118 @@ void Queue::requeue(const QueuedMessage&
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
- assertClusterSafe();
- QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message)) {
- QPID_LOG(debug, "Acquired message at " << position << " from " << name);
- return true;
- } else {
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
- }
-}
-
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
-{
- assertClusterSafe();
- QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
- bool ok;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- ok = allocator->allocate( consumer, msg );
- }
- if (!ok) {
- QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
- return false;
- }
-
- QueuedMessage copy(msg);
- if (acquire( msg.position, copy)) {
- QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
- return true;
+ assertClusterSafe();
+ QPID_LOG(debug, "Attempting to dequeue message at " << position);
+ QueueCursor cursor;
+ Message* msg = messages->find(position, &cursor);
+ if (msg) {
+ if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+ observeDequeue(*msg, locker);
+ messages->deleted(cursor);
+ } else {
+ QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
+ return false;
+ }
}
- QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
- return false;
+ dequeueFromStore(pmsg);
+ return true;
}
-void Queue::notifyListener()
+bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
+ Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
- QueueListeners::NotificationSet set;
- {
- Mutex::ScopedLock locker(messageLock);
- if (messages->size()) {
- listeners.populate(set);
- }
- }
- set.notify();
-}
+ Message* msg;
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- checkNotDeleted(c);
- if (c->preAcquires()) {
- switch (consumeNextMessage(m, c)) {
- case CONSUMED:
- return true;
- case CANT_CONSUME:
- notifyListener();//let someone else try
- case NO_MESSAGES:
- default:
+ msg = messages->find(position);
+ if (msg) {
+ QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence());
+ if (!allocator->acquire(consumer, *msg)) {
+ QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name);
return false;
+ } else {
+ observeAcquire(*msg, locker);
+ QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name);
+ return true;
}
} else {
- return browseNextMessage(m, c);
+ QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name);
+ return false;
}
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
+ checkNotDeleted(c);
+ QueueListeners::NotificationSet set;
while (true) {
- QueuedMessage msg;
- bool found;
- {
- Mutex::ScopedLock locker(messageLock);
- found = allocator->nextConsumableMessage(c, msg);
- if (!found) listeners.addListener(c);
- }
- if (!found) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- dequeue(0, msg);
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
- }
- continue;
- }
-
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- {
- Mutex::ScopedLock locker(messageLock);
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- observeAcquire(msg, locker);
- }
+ //TODO: reduce lock scope
+ Mutex::ScopedLock locker(messageLock);
+ Message* msg = messages->next(*c);
+ if (msg) {
+ if (msg->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ observeDequeue(*msg, locker);
+ //ERROR: don't hold lock across call to store!!
+ if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
- mgmtObject->inc_acquires();
+ mgmtObject->inc_discardsTtl();
if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
+ brokerMgmtObject->inc_discardsTtl();
}
- m = msg;
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ messages->deleted(*c);
+ continue;
}
- } else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- }
-
- Mutex::ScopedLock locker(messageLock);
- messages->release(msg);
- return CANT_CONSUME;
- }
-}
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- while (true) {
- QueuedMessage msg;
- bool found;
- {
- Mutex::ScopedLock locker(messageLock);
- found = allocator->nextBrowsableMessage(c, msg);
- if (!found) listeners.addListener(c);
- }
- if (!found) { // no next available
- QPID_LOG(debug, "No browsable messages available for consumer " <<
- c->getName() << " on queue '" << name << "'");
- return false;
- }
-
- if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
- if (c->accept(msg.payload)) {
- //consumer wants the message
- c->setPosition(msg.position);
- m = msg;
- return true;
+ if (c->filter(*msg)) {
+ if (c->accept(*msg)) {
+ if (c->preAcquires()) {
+ QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
+ if (allocator->acquire(c->getName(), *msg)) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+ observeAcquire(*msg, locker);
+ msg->deliver();
+ } else {
+ QPID_LOG(debug, "Could not acquire message from '" << name << "'");
+ continue; //try another message
+ }
+ }
+ QPID_LOG(debug, "Message retrieved from '" << name << "'");
+ m = *msg;
+ return true;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ if (c->preAcquires()) {
+ //let someone else try
+ listeners.populate(set);
+ }
+ break;
+ }
} else {
- //browser hasn't got enough credit for the message
- QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
- return false;
+ //consumer will never want this message, try another one
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ if (c->preAcquires()) {
+ //let someone else try to take this one
+ listeners.populate(set);
+ }
}
} else {
- //consumer will never want this message, continue seeking
- QPID_LOG(debug, "Browser skipping message from '" << name << "'");
- c->setPosition(msg.position);
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
}
}
+ set.notify();
return false;
}
@@ -507,23 +453,28 @@ void Queue::removeListener(Consumer::sha
bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
+ Message msg;
if (getNextMessage(msg, c)) {
- c->deliver(msg);
+ c->deliver(*c, msg);
return true;
} else {
return false;
}
}
-bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
+bool Queue::find(SequenceNumber pos, Message& msg) const
+{
Mutex::ScopedLock locker(messageLock);
- if (messages->find(pos, msg))
+ Message* ptr = messages->find(pos, 0);
+ if (ptr) {
+ msg = *ptr;
return true;
+ }
return false;
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
+{
assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
@@ -550,7 +501,7 @@ void Queue::consume(Consumer::shared_ptr
browserCount++;
consumerCount++;
//reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
+ if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
observeConsumerAdd(*c, locker);
@@ -559,7 +510,8 @@ void Queue::consume(Consumer::shared_ptr
mgmtObject->inc_consumerCount ();
}
-void Queue::cancel(Consumer::shared_ptr c){
+void Queue::cancel(Consumer::shared_ptr c)
+{
removeListener(c);
{
Mutex::ScopedLock locker(messageLock);
@@ -572,65 +524,6 @@ void Queue::cancel(Consumer::shared_ptr
mgmtObject->dec_consumerCount ();
}
-QueuedMessage Queue::get(){
- QueuedMessage msg(this);
- bool ok;
- {
- Mutex::ScopedLock locker(messageLock);
- ok = messages->consume(msg);
- if (ok) observeAcquire(msg, locker);
- }
-
- if (ok && mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
-
- return msg;
-}
-
-namespace {
-bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
- std::deque<QueuedMessage>& collection)
-{
- if (predicate(qm)) {
- collection.push_back(qm);
- return true;
- } else {
- return false;
- }
-}
-
-bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
-} // namespace
-
-void Queue::dequeueIf(Messages::Predicate predicate,
- std::deque<QueuedMessage>& dequeued)
-{
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
- }
- if (!dequeued.empty()) {
- if (mgmtObject) {
- mgmtObject->inc_acquires(dequeued.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires(dequeued.size());
- }
- for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
- i != dequeued.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
- }
- }
-}
-
/**
*@param lapse: time since the last purgeExpired
*/
@@ -642,13 +535,17 @@ void Queue::purgeExpired(sys::Duration l
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> dequeued;
- dequeueIf(boost::bind(&isExpired, _1), dequeued);
- if (dequeued.size()) {
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl(dequeued.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl(dequeued.size());
+ uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
+ QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
+ //
+ // Report the count of discarded-by-ttl messages
+ //
+ if (mgmtObject && count) {
+ mgmtObject->inc_acquires(count);
+ mgmtObject->inc_discardsTtl(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_discardsTtl(count);
}
}
}
@@ -663,7 +560,7 @@ namespace {
static const std::string typeKey;
static const std::string paramsKey;
static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
- virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual bool match( const Message& ) const { return true; }
virtual ~MessageFilter() {}
protected:
MessageFilter() {};
@@ -687,13 +584,9 @@ namespace {
static const std::string valueKey;
HeaderMatchFilter( const std::string& _header, const std::string& _value )
: MessageFilter (), header(_header), value(_value) {}
- bool match( const QueuedMessage& msg ) const
+ bool match( const Message& msg ) const
{
- const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
- if (!headers) return false;
- FieldTable::ValuePtr h = headers->get(header);
- if (!h || !h->convertsTo<std::string>()) return false;
- return h->get<std::string>() == value;
+ return msg.getPropertyAsString(header) == value;
}
private:
const std::string header;
@@ -730,36 +623,68 @@ namespace {
return new MessageFilter();
}
- // used by removeIf() to collect all messages matching a filter, maximum match count is
- // optional.
- struct Collector {
- const uint32_t maxMatches;
- MessageFilter& filter;
- std::deque<QueuedMessage> matches;
- Collector(MessageFilter& filter, uint32_t max)
- : maxMatches(max), filter(filter) {}
- bool operator() (QueuedMessage& qm)
- {
- if (maxMatches == 0 || matches.size() < maxMatches) {
- if (filter.match( qm )) {
- matches.push_back(qm);
- return true;
- }
- }
+ bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
+ {
+ if (e) {
+ DeliverableMessage d(m, 0);
+ d.getMessage().clearTrace();
+ e->routeWithAlternate(d);
+ return true;
+ } else {
return false;
}
- };
-
+ }
+ void moveTo(boost::shared_ptr<Queue> q, Message& m)
+ {
+ if (q) {
+ q->deliver(m);
+ }
+ }
} // end namespace
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
+{
+ std::deque<Message> removed;
+ {
+ QueueCursor c(type);
+ uint32_t count(0);
+ Mutex::ScopedLock locker(messageLock);
+ Message* m = messages->next(c);
+ while (m){
+ if (!p || p(*m)) {
+ if (!maxCount || count++ < maxCount) {
+ if (m->getState() == AVAILABLE) {
+ //don't actually acquire, just act as if we did
+ observeAcquire(*m, locker);
+ }
+ observeDequeue(*m, locker);
+ removed.push_back(*m);//takes a copy of the message
+ if (!messages->deleted(c)) {
+ QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+ assert(false);
+ }
+ } else {
+ break;
+ }
+ }
+ m = messages->next(c);
+ }
+ }
+ for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) {
+ if (f) f(*i);//ERROR? need to clear old persistent context?
+ if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing
+ }
+ return removed.size();
+}
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
*
- * purge_request == 0 then purge all messages
- * == N then purge N messages from queue
- * Sometimes purge_request == 1 to unblock the top of queue
+ * qty == 0 then purge all messages
+ * == N then purge N messages from queue
+ * Sometimes qty == 1 to unblock the top of queue
*
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
@@ -768,172 +693,53 @@ namespace {
* An optional filter can be supplied that will be applied against each message. The
* message is purged only if the filter matches. See MessageDistributor for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), purge_request);
+ uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- }
-
- if (!c.matches.empty()) {
- if (mgmtObject) {
- mgmtObject->inc_acquires(c.matches.size());
- if (dest.get()) {
- mgmtObject->inc_reroutes(c.matches.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(c.matches.size());
- brokerMgmtObject->inc_reroutes(c.matches.size());
- }
- } else {
- mgmtObject->inc_discardsPurge(c.matches.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(c.matches.size());
- brokerMgmtObject->inc_discardsPurge(c.matches.size());
- }
- }
- }
-
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
-
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*qmsg, locker);
+ if (mgmtObject && count) {
+ mgmtObject->inc_acquires(count);
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_reroutes(count);
}
- dequeue(0, *qmsg);
- QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
- // now reroute if necessary
- if (dest.get()) {
- assert(qmsg->payload);
- qmsg->payload->clearTrace();
- DeliverableMessage dmsg(qmsg->payload);
- dest->routeWithAlternate(dmsg);
+ } else {
+ mgmtObject->inc_discardsPurge(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_discardsPurge(count);
}
}
}
- return c.matches.size();
+
+ return count;
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), qty);
-
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- }
-
-
- if (!c.matches.empty()) {
- // Update observers and message state:
-
- if (mgmtObject) {
- mgmtObject->inc_acquires(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires(c.matches.size());
- }
-
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- {
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*qmsg, locker);
- }
- dequeue(0, *qmsg);
- // and move to destination Queue.
- assert(qmsg->payload);
- destq->deliver(qmsg->payload);
- }
- }
- return c.matches.size();
+ return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
}
-/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
+void Queue::push(Message& message, bool /*isRecovery*/)
{
- bool ok;
- {
- Mutex::ScopedLock locker(messageLock);
- ok = messages->acquire(position, msg);
- if (ok) observeAcquire(msg, locker);
- }
- if (ok) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
- ++dequeueSincePurge;
- return true;
- }
- return false;
-}
-
-void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- QueuedMessage removed, qm(this, msg);
- bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
- qm.position = ++sequence;
- if (messages->push(qm, removed)) {
- dequeueRequired = true;
- observeAcquire(removed, locker);
- }
- observeEnqueue(qm, locker);
- if (policy.get()) {
- policy->enqueued(qm);
- }
+ message.setSequence(++sequence);
+ messages->publish(message);
listeners.populate(copy);
- }
- if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
-
- mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
-
- if (dequeueRequired) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires();
- brokerMgmtObject->inc_discardsLvq();
- }
- }
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- Mutex::ScopedLock locker(messageLock);
- pendingDequeues.push_back(removed);
- } else {
- dequeue(0, removed);
- }
+ observeEnqueue(message, locker);
}
copy.notify();
}
-void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
-{
- if (message.payload->isIngressComplete()) (*result)++;
-}
-
-/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getEnqueueCompleteMessageCount() const
-{
- uint32_t count = 0;
- Mutex::ScopedLock locker(messageLock);
- messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
- return count;
-}
-
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
@@ -949,7 +755,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
- return autodelete && !consumerCount && !owner;
+ return settings.autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -957,14 +763,9 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
-void Queue::forcePersistent(QueuedMessage& message)
+void Queue::forcePersistent(const Message& /*message*/)
{
- if(!message.payload->isStoredOnQueue(shared_from_this())) {
- message.payload->forcePersistent();
- if (message.payload->isForcedPersistent() ){
- enqueue(0, message.payload);
- }
- }
+ //TODO
}
void Queue::setLastNodeFailure()
@@ -982,153 +783,129 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
+/*
+ * return true if enqueue succeeded and message should be made
+ * available; returning false will result in the message being dropped
+ */
+bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
- if (policy.get() && !suppressPolicyCheck) {
- std::deque<QueuedMessage> dequeues;
- {
- Mutex::ScopedLock locker(messageLock);
- try {
- policy->tryEnqueue(msg);
- } catch(ResourceLimitExceededException&) {
- if (mgmtObject) {
- mgmtObject->inc_discardsOverflow();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsOverflow();
- }
- throw;
- }
- policy->getPendingDequeues(dequeues);
- }
- //depending on policy, may have some dequeues that need to performed without holding the lock
-
- //
- // Count the dequeues as ring-discards. We know that these aren't rejects because
- // policy->tryEnqueue would have thrown an exception.
- //
- if (mgmtObject && !dequeues.empty()) {
- mgmtObject->inc_discardsRing(dequeues.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsRing(dequeues.size());
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ return false;
}
-
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
- msg->forcePersistent();
+ forcePersistent(msg);
}
- if (traceId.size()) {
- msg->addTraceId(traceId);
+ if (settings.traceId.size()) {
+ msg.addTraceId(settings.traceId);
}
- if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
+ if (msg.isPersistent() && store) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
- msg->enqueueAsync(shared_from_this(), store);
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
+ assert(pmsg);
+ pmsg->enqueueAsync(shared_from_this(), store);
store->enqueue(ctxt, pmsg, *this);
- return true;
}
- if (!store) {
- //Messages enqueued on a transient queue should be prevented
- //from having their content released as it may not be
- //recoverable by these queue for delivery
- msg->blockContentRelease();
- }
- return false;
+ return true;
}
-void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+void Queue::enqueueAborted(const Message& msg)
{
+ //Called when any transactional enqueue is aborted (including but
+ //not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ current -= QueueDepth(1, msg.getContentSize());
}
-// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+void Queue::enqueueCommited(Message& msg)
{
- ScopedUse u(barrier);
- if (!u.acquired) return false;
- {
- Mutex::ScopedLock locker(messageLock);
- if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
- observeDequeue(msg, locker);
- }
+ //called when a recovered dtx enqueue operation is committed; the
+ //message is already on disk and space has been reserved in policy
+ //but it should now be made available
+ process(msg);
+}
+void Queue::dequeueAborted(Message& msg)
+{
+ //called when a recovered dtx dequeue operation is aborted; the
+ //message should be added back to the queue
+ push(msg);
+}
+void Queue::dequeueCommited(const Message& msg)
+{
+ //called when a recovered dtx dequeue operation is committed; the
+ //message will at this point have already been removed from the
+ //store and will not be available for delivery. The only action
+ //required is to ensure the observers are notified and the
+ //management stats are correctly decremented
+ Mutex::ScopedLock locker(messageLock);
+ observeDequeue(msg, locker);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTxnDequeues();
+ mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
}
+}
- if (!ctxt) {
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- }
-
- // This check prevents messages which have been forced persistent on one queue from dequeuing
- // from another on which no forcing has taken place and thus causing a store error.
- bool fp = msg.payload->isForcedPersistent();
- if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) {
- if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(ctxt, pmsg, *this);
- return true;
- }
+
+void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
+{
+ ScopedUse u(barrier);
+ if (u.acquired && msg && store) {
+ store->dequeue(0, msg, *this);
}
- return false;
}
-void Queue::dequeueCommitted(const QueuedMessage& msg)
+void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
- observeDequeue(msg, locker);
+ Message* msg = messages->find(cursor);
+ if (msg) {
+ if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+ if (!ctxt) {
+ observeDequeue(*msg, locker);
+ messages->deleted(cursor);//message pointer not valid after this
+ }
+ } else {
+ return;
+ }
}
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- if (mgmtObject != 0) {
- _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.payload->contentSize();
- qStats->msgTxnDequeues += 1;
- qStats->byteTxnDequeues += contentSize;
- mgmtObject->statisticsUpdated();
+ if (store && pmsg) {
+ store->dequeue(ctxt, pmsg, *this);
+ }
+}
+
+void Queue::dequeueCommitted(const QueueCursor& cursor)
+{
+ Mutex::ScopedLock locker(messageLock);
+ Message* msg = messages->find(cursor);
+ if (msg) {
+ const uint64_t contentSize = msg->getContentSize();
+ observeDequeue(*msg, locker);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTxnDequeues();
+ mgmtObject->inc_byteTxnDequeues(contentSize);
+ }
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
bStats->byteTxnDequeues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
- }
-}
-
-/**
- * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
- * it from the logical (and persistent if applicable) queue
- */
-bool Queue::popAndDequeue(QueuedMessage& msg)
-{
- bool popped;
- {
- Mutex::ScopedLock locker(messageLock);
- popped = messages->consume(msg);
- if (popped) observeAcquire(msg, locker);
- }
- if (popped) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
- dequeue(0, msg);
- return true;
+ messages->deleted(cursor);
} else {
- return false;
+ QPID_LOG(error, "Could not find dequeued message on commit");
}
}
@@ -1136,8 +913,10 @@ bool Queue::popAndDequeue(QueuedMessage&
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
{
+ current -= QueueDepth(1, msg.getContentSize());
+ mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -1150,7 +929,7 @@ void Queue::observeDequeue(const QueuedM
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1164,7 +943,7 @@ void Queue::observeAcquire(const QueuedM
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1202,13 +981,11 @@ void Queue::observeConsumerRemove( const
}
-void Queue::create(const FieldTable& _settings)
+void Queue::create()
{
- settings = _settings;
if (store) {
- store->create(*this, _settings);
+ store->create(*this, settings.storeSettings);
}
- configureImpl(_settings);
}
@@ -1258,112 +1035,21 @@ bool getBoolSetting(const qpid::framing:
}
}
-void Queue::configure(const FieldTable& _settings)
+void Queue::abandoned(const Message& message)
{
- settings = _settings;
- configureImpl(settings);
-}
-
-void Queue::configureImpl(const FieldTable& _settings)
-{
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
- if (eventMode && broker) {
- broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
- }
-
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
- if ( NullMessageStore::isNullStore(store)) {
- QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (broker && !(broker->getQueueEvents().isSync()) ) {
- QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
- }
- FieldTable copy(_settings);
- copy.erase(QueuePolicy::typeKey);
- setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
- } else {
- setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
- }
- if (broker && broker->getManagementAgent()) {
- ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
- }
-
- //set this regardless of owner to allow use of no-local with exclusive consumers also
- noLocal = getBoolSetting(_settings, qpidNoLocal);
- QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
-
- std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
- if (lvqKey.size()) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
- messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
- messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
- messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else {
- std::auto_ptr<Messages> m = Fairshare::create(_settings);
- if (m.get()) {
- messages = m;
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
- } else { // default (FIFO) queue type
- // override default message allocator if message groups configured.
- boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
- if (mgm) {
- allocator = mgm;
- addObserver(mgm);
- }
- }
- }
-
- persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
-
- traceId = _settings.getAsString(qpidTraceIdentity);
- std::string excludeList = _settings.getAsString(qpidTraceExclude);
- if (excludeList.size()) {
- split(traceExclude, excludeList, ", ");
- }
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
- << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
-
- FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
- if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
-
- autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
-
- if (mgmtObject != 0) {
- mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
- }
-
- QueueFlowLimit::observe(*this, _settings);
+ if (reroute(alternateExchange, message) && brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ else if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
void Queue::destroyed()
{
unbind(broker->getExchanges());
-
- QueuedMessage m;
- while(popAndDequeue(m)) {
- DeliverableMessage msg(m.payload);
- if (alternateExchange.get()) {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- alternateExchange->routeWithAlternate(msg);
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
- }
- if (alternateExchange.get())
+ remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
+ if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
+ }
if (store) {
barrier.destroy();
@@ -1401,20 +1087,6 @@ void Queue::unbind(ExchangeRegistry& exc
bindings.unbind(exchanges, shared_from_this());
}
-void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
-{
- Mutex::ScopedLock locker(messageLock);
- policy = _policy;
- if (policy.get())
- policy->setQueue(this);
-}
-
-const QueuePolicy* Queue::getPolicy()
-{
- Mutex::ScopedLock locker(messageLock);
- return policy.get();
-}
-
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
@@ -1434,10 +1106,7 @@ void Queue::setPersistenceId(uint64_t _p
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
- buffer.put(settings);
- if (policy.get()) {
- buffer.put(*policy);
- }
+ buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
}
@@ -1445,21 +1114,19 @@ uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
- + settings.encodedSize()
- + (policy.get() ? (*policy).encodedSize() : 0);
+ + encodableSettings.encodedSize();
}
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
buffer.getShortString(name);
- FieldTable settings;
- buffer.get(settings);
+ FieldTable ft;
+ buffer.get(ft);
boost::shared_ptr<Exchange> alternate;
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
- if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
- buffer.get ( *(result.first->policy) );
- }
+ QueueSettings settings(true, false);
+ settings.populate(ft, settings.storeSettings);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true);
if (buffer.available()) {
string altExch;
buffer.getShortString(altExch);
@@ -1523,8 +1190,8 @@ struct AutoDeleteTask : qpid::sys::Timer
void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
- if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
- AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
@@ -1543,12 +1210,15 @@ void Queue::releaseExclusiveOwnership()
{
Mutex::ScopedLock locker(ownershipLock);
owner = 0;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(false);
+ }
}
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
//reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
+ if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
Mutex::ScopedLock locker(ownershipLock);
@@ -1556,6 +1226,9 @@ bool Queue::setExclusiveOwner(const Owne
return false;
} else {
owner = o;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(true);
+ }
return true;
}
}
@@ -1687,7 +1360,7 @@ namespace {
struct After {
framing::SequenceNumber seq;
After(framing::SequenceNumber s) : seq(s) {}
- bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+ bool operator()(const Message& m) { return m.getSequence() > seq; }
};
} // namespace
@@ -1695,12 +1368,10 @@ struct After {
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
- std::deque<QueuedMessage> dequeued;
- dequeueIf(After(n), dequeued);
- messages->setPosition(n);
+ remove(0, After(n), MessagePredicate(), BROWSER);
}
sequence = n;
- QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
+ QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
@@ -1721,25 +1392,16 @@ void Queue::recoveryComplete(ExchangeReg
<< "\": exchange does not exist.");
}
//process any pending dequeues
- std::deque<QueuedMessage> pd;
- {
- Mutex::ScopedLock locker(messageLock);
- pendingDequeues.swap(pd);
+ for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) {
+ dequeueFromStore(i->getPersistentContext());
}
- for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-}
-
-void Queue::insertSequenceNumbers(const std::string& key)
-{
- seqNoKey = key;
- insertSeqNo = !seqNoKey.empty();
- QPID_LOG(debug, "Inserting sequence numbers as " << key);
+ pendingDequeues.clear();
}
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1748,32 +1410,7 @@ void Queue::observeEnqueue(const QueuedM
QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
}
-}
-
-void Queue::updateEnqueued(const QueuedMessage& m)
-{
- if (m.payload) {
- boost::intrusive_ptr<Message> payload = m.payload;
- enqueue(0, payload, true);
- {
- Mutex::ScopedLock locker(messageLock);
- messages->updateAcquired(m);
- observeEnqueue(m, locker);
- if (policy.get()) {
- policy->recoverEnqueued(payload);
- policy->enqueued(m);
- }
- }
- mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
- } else {
- QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
- }
-}
-
-bool Queue::isEnqueued(const QueuedMessage& msg)
-{
- Mutex::ScopedLock locker(messageLock);
- return !policy.get() || policy->isEnqueued(msg);
+ mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's
@@ -1835,28 +1472,82 @@ void Queue::setDequeueSincePurge(uint32_
dequeueSincePurge = value;
}
-namespace{
-class FindLowest
+void Queue::reject(const QueueCursor& cursor)
{
- public:
- FindLowest() : init(false) {}
- void process(const QueuedMessage& message) {
- QPID_LOG(debug, "FindLowest processing: " << message.position);
- if (!init || message.position < lowest) lowest = message.position;
- init = true;
- }
- bool getLowest(qpid::framing::SequenceNumber& result) {
- if (init) {
- result = lowest;
- return true;
+ Exchange::shared_ptr alternate = getAlternateExchange();
+ Message copy;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ Message* message = messages->find(cursor);
+ if (message) {
+ if (alternate) copy = *message;
+ if (message->isPersistent()) pmsg = message->getPersistentContext();
+ countRejected();
+ observeDequeue(*message, locker);
+ messages->deleted(cursor);
} else {
- return false;
+ return;
}
}
- private:
- bool init;
- qpid::framing::SequenceNumber lowest;
-};
+ if (alternate) {
+ copy.resetDeliveryCount();
+ DeliverableMessage delivery(copy, 0);
+ alternate->routeWithAlternate(delivery);
+ QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << getName());
+ }
+ dequeueFromStore(pmsg);
+}
+
+bool Queue::checkDepth(const QueueDepth& increment, const Message&)
+{
+ if (current && (settings.maxDepth - current < increment)) {
+ if (mgmtObject) {
+ mgmtObject->inc_discardsOverflow();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsOverflow();
+ }
+ throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
+ } else {
+ current += increment;
+ return true;
+ }
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate)
+{
+ Mutex::ScopedLock locker(messageLock);
+ //hold lock across calls to predicate, or take copy of message?
+ //currently hold lock, may want to revise depending on any new use
+ //cases
+ Message* message = messages->next(cursor);
+ while (message && (predicate && !predicate(*message))) {
+ message = messages->next(cursor);
+ }
+ return message != 0;
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start)
+{
+ Mutex::ScopedLock locker(messageLock);
+ //hold lock across calls to predicate, or take copy of message?
+ //currently hold lock, may want to revise depending on any new use
+ //cases
+ Message* message;
+ message = messages->find(start, &cursor);
+ if (message && (!predicate || predicate(*message))) return true;
+
+ return seek(cursor, predicate);
+}
+
+bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start)
+{
+ Mutex::ScopedLock locker(messageLock);
+ return messages->find(start, &cursor);
}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Aug 10 12:04:27 2012
@@ -28,12 +28,14 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/TxOp.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
@@ -56,10 +58,14 @@
namespace qpid {
namespace broker {
class Broker;
+class Exchange;
class MessageStore;
+class QueueDepth;
class QueueEvents;
class QueueRegistry;
+class QueueFactory;
class TransactionContext;
+class TxBuffer;
class MessageDistributor;
/**
@@ -70,7 +76,9 @@ class MessageDistributor;
*/
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
-
+ public:
+ typedef boost::function1<bool, const Message&> MessagePredicate;
+ protected:
struct UsageBarrier
{
Queue& parent;
@@ -90,31 +98,40 @@ class Queue : public boost::enable_share
~ScopedUse() { if (acquired) barrier.release(); }
};
+ class TxPublish : public TxOp
+ {
+ Message message;
+ boost::shared_ptr<Queue> queue;
+ bool prepared;
+ public:
+ TxPublish(const Message&,boost::shared_ptr<Queue>);
+ bool prepare(TransactionContext* ctxt) throw();
+ void commit() throw();
+ void rollback() throw();
+ };
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+ typedef boost::function1<void, Message&> MessageFunctor;
const std::string name;
- const bool autodelete;
MessageStore* store;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
OwnershipToken* exclusive;
- bool noLocal;
bool persistLastNode;
bool inLastNodeFailure;
- std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
std::auto_ptr<Messages> messages;
- std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
+ std::vector<Message> pendingDequeues;
/** messageLock is used to keep the Queue's state consistent while processing message
* events, such as message dispatch, enqueue, acquire, and dequeue. It must be held
* while updating certain members in order to keep these members consistent with
* each other:
* o messages
* o sequence
- * o policy
* o listeners
* o allocator
* o observeXXX() methods
@@ -127,9 +144,9 @@ class Queue : public boost::enable_share
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
- framing::FieldTable settings;
- std::auto_ptr<QueuePolicy> policy;
- bool policyExceeded;
+ const QueueSettings settings;
+ qpid::framing::FieldTable encodableSettings;
+ QueueDepth current;
QueueBindings bindings;
std::string alternateExchangeName;
boost::shared_ptr<Exchange> alternateExchange;
@@ -139,43 +156,42 @@ class Queue : public boost::enable_share
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
- bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
bool deleted;
UsageBarrier barrier;
- int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
- void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
- void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- void notifyListener();
+ virtual void push(Message& msg, bool isRecovery=false);
+ void process(Message& msg);
+ bool enqueue(TransactionContext* ctxt, Message& msg);
+ bool getNextMessage(Message& msg, Consumer::shared_ptr& c);
void removeListener(Consumer::shared_ptr);
- bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ bool isExcluded(const Message& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes.
- * messageLock is held by caller */
- void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+ * must be held by caller */
+ void observeEnqueue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeAcquire(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeRequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
- bool popAndDequeue(QueuedMessage&);
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
- void forcePersistent(QueuedMessage& msg);
+ bool acquire(const qpid::framing::SequenceNumber& position, Message& msg,
+ const qpid::sys::Mutex::ScopedLock& locker);
+
+ void forcePersistent(const Message& msg);
int getEventMode();
- void configureImpl(const qpid::framing::FieldTable& settings);
- void checkNotDeleted(const Consumer::shared_ptr& c);
+ void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
+ void abandoned(const Message& message);
+ void checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
- void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
+ uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
+ virtual bool checkDepth(const QueueDepth& increment, const Message&);
public:
@@ -184,12 +200,11 @@ class Queue : public boost::enable_share
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
+ const QueueSettings& settings = QueueSettings(),
MessageStore* const store = 0,
- const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
- QPID_BROKER_EXTERN ~Queue();
+ QPID_BROKER_EXTERN virtual ~Queue();
/** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
@@ -198,19 +213,13 @@ class Queue : public boost::enable_share
* @param msg - message to be acquired.
* @return false if message is no longer available for acquire.
*/
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+ QPID_BROKER_EXTERN bool acquire(const QueueCursor& msg, const std::string& consumer);
/**
- * Used to configure a new queue and create a persistent record
- * for it in store if required.
+ * Used to create a persistent record for the queue in store if required.
*/
- QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+ QPID_BROKER_EXTERN void create();
- /**
- * Used to reconfigure a recovered queue (does not create
- * persistent record in store).
- */
- QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
void destroyed();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
@@ -224,34 +233,36 @@ class Queue : public boost::enable_share
boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
- /** Acquire the message at the given position if it is available for acquire. Not to
- * be used by clients, but used by the broker for queue management.
- * @param message - set to the acquired message if true returned.
- * @return true if the message has been acquired.
+ /**
+ * Removes (and dequeues) a message by its sequence number (used
+ * for some broker features, e.g. queue replication)
+ *
+ * @param position the sequence number of the message to be dequeued.
+ * @return true if the message is dequeued.
*/
- QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
+ QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
*/
- QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
- /**
- * Dispatches the messages immediately to a consumer if
- * one is available or stores it for later if not.
- */
- QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
/**
* Returns a message to the in-memory queue (due to lack
* of acknowledegement from a receiver). If a consumer is
* available it will be dispatched immediately, else it
* will be returned to the front of the queue.
*/
- QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN void release(const QueueCursor& msg, bool markRedelivered=true);
+ QPID_BROKER_EXTERN void reject(const QueueCursor& msg);
+
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate);
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate, qpid::framing::SequenceNumber start);
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, qpid::framing::SequenceNumber start);
/**
* Used during recovery to add stored messages back to the queue
*/
- QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void recover(Message& msg);
QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
bool exclusive = false);
@@ -268,7 +279,6 @@ class Queue : public boost::enable_share
const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
- QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
@@ -277,8 +287,9 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
- inline const framing::FieldTable& getSettings() const { return settings; }
- inline bool isAutoDelete() const { return autodelete; }
+ inline const QueueSettings& getSettings() const { return settings; }
+ inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
+ inline bool isAutoDelete() const { return settings.autodelete; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
const QueueBindings& getBindings() const { return bindings; }
@@ -288,48 +299,22 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
- QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+ QPID_BROKER_EXTERN void dequeue(TransactionContext* ctxt, const QueueCursor&);
/**
* Inform the queue that a previous transactional dequeue
* committed.
*/
- QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
-
- /**
- * Inform queue of messages that were enqueued, have since
- * been acquired but not yet accepted or released (and
- * thus are still logically on the queue) - used in
- * clustered broker.
- */
- QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
-
- /**
- * Test whether the specified message (identified by its
- * sequence/position), is still enqueued (note this
- * doesn't mean it is available for delivery as it may
- * have been delievered to a subscriber who has not yet
- * accepted it).
- */
- QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
-
- /**
- * Acquires the next available (oldest) message
- */
- QPID_BROKER_EXTERN QueuedMessage get();
+ void dequeueCommitted(const QueueCursor& msg);
/** Get the message at position pos, returns true if found and sets msg */
- QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
-
- QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
+ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
- QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN bool isLocal(const Message& msg);
//PersistableQueue support:
QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
@@ -410,7 +395,11 @@ class Queue : public boost::enable_share
* Reserve space in policy for an enqueued message that
* has been recovered in the prepared state (dtx only)
*/
- QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void recoverPrepared(const Message& msg);
+ void enqueueAborted(const Message& msg);
+ void enqueueCommited(Message& msg);
+ void dequeueAborted(Message& msg);
+ void dequeueCommited(const Message& msg);
QPID_BROKER_EXTERN void flush();
@@ -418,6 +407,7 @@ class Queue : public boost::enable_share
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+ friend class QueueFactory;
};
}
}
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp Fri Aug 10 12:04:27 2012
@@ -18,21 +18,27 @@
* under the License.
*
*/
-#include "qpid/broker/ExpiryPolicy.h"
+#include "QueueCursor.h"
#include "qpid/broker/Message.h"
-#include "qpid/sys/Time.h"
namespace qpid {
namespace broker {
+QueueCursor::QueueCursor(SubscriptionType t) : type(t), position(0), version(0), valid(false) {}
-ExpiryPolicy::~ExpiryPolicy() {}
-
-bool ExpiryPolicy::hasExpired(Message& m) {
- return m.getExpiration() < sys::AbsTime::now();
+void QueueCursor::setPosition(int32_t p, int32_t v)
+{
+ position = p;
+ version = v;
+ valid = true;
}
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
- return sys::AbsTime::now();
+bool QueueCursor::check(const Message& m)
+{
+ return (m.getState() == AVAILABLE || ((type == REPLICATOR || type == PURGE) && m.getState() == ACQUIRED));
}
+bool QueueCursor::isValid(int32_t v)
+{
+ return valid && (valid = (v == version));
+}
}} // namespace qpid::broker
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h Fri Aug 10 12:04:27 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_EXPIRYPOLICY_H
-#define QPID_BROKER_EXPIRYPOLICY_H
+#ifndef QPID_BROKER_QUEUECURSOR_H
+#define QPID_BROKER_QUEUECURSOR_H
/*
*
@@ -21,30 +21,51 @@
* under the License.
*
*/
-
-#include "qpid/RefCounted.h"
#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
-
-namespace sys {
-class AbsTime;
-}
-
namespace broker {
class Message;
+enum SubscriptionType
+{
+ CONSUMER,
+ BROWSER,
+ PURGE,
+ REPLICATOR
+};
+
+class CursorContext {
+ public:
+ virtual ~CursorContext() {}
+};
/**
- * Default expiry policy.
+ *
*/
-class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
+class QueueCursor
{
public:
- QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
- QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
- QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
+ QPID_BROKER_EXTERN QueueCursor(SubscriptionType type = CONSUMER);
+
+ private:
+ SubscriptionType type;
+ int32_t position;
+ int32_t version;
+ bool valid;
+ boost::shared_ptr<CursorContext> context;
+
+ void setPosition(int32_t p, int32_t v);
+ bool check(const Message& m);
+ bool isValid(int32_t v);
+
+ friend class MessageDeque;
+ friend class MessageMap;
+ friend class PriorityQueue;
+ template <typename T> friend class IndexedDeque;
};
}} // namespace qpid::broker
-#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/
+#endif /*!QPID_BROKER_QUEUECURSOR_H*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org