You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [9/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,10 +31,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/MessageGroupManager.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -44,7 +41,6 @@
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
-#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -68,7 +64,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -90,16 +86,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -114,8 +110,7 @@ Queue::Queue(const string& _name, bool _
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0),
- allocator(new FifoDistributor( *messages ))
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -168,8 +163,13 @@ void Queue::deliver(boost::intrusive_ptr
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- enqueue(0, msg);
- push(msg);
+ // if no store then mark as enqueued
+ if (!enqueue(0, msg)){
+ push(msg);
+ msg->enqueueComplete();
+ }else {
+ push(msg);
+ }
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -183,10 +183,11 @@ void Queue::recover(boost::intrusive_ptr
if (policy.get()) policy->recoverEnqueued(msg);
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
+ msg->enqueueComplete(); // mark the message as enqueued
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -210,13 +211,14 @@ void Queue::process(boost::intrusive_ptr
void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
+ msg.payload->enqueueComplete(); // mark the message as enqueued
messages->reinsert(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -224,17 +226,16 @@ void Queue::requeue(const QueuedMessage&
enqueue(0, payload);
}
}
- observeRequeue(msg, locker);
}
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message, locker)) {
+ if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -243,24 +244,9 @@ bool Queue::acquireMessageAt(const Seque
}
}
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
-{
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
- QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
- if (!allocator->allocate( consumer, msg )) {
- QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
- return false;
- }
-
- QueuedMessage copy(msg);
- if (acquire( msg.position, copy, locker)) {
- QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
- return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
- return false;
+bool Queue::acquire(const QueuedMessage& msg) {
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -276,7 +262,7 @@ void Queue::notifyListener()
set.notify();
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
checkNotDeleted();
if (c->preAcquires()) {
@@ -288,71 +274,52 @@ bool Queue::getNextMessage(QueuedMessage
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
-
- if (!allocator->nextConsumableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No messages available to dispatch to consumer " <<
- c->getName() << " on queue '" << name << "'");
+ if (messages->empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->position = msg.position;
- acquire( msg.position, msg, locker);
- dequeue( 0, msg );
- continue;
- }
-
- // a message is available for this consumer - can the consumer use it?
+ } else {
+ QueuedMessage msg = messages->front();
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ popAndDequeue();
+ continue;
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- ok = acquire( msg.position, msg, locker);
- (void) ok; assert(ok);
- m = msg;
- c->position = m.position;
- return CONSUMED;
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ m = msg;
+ pop();
+ return CONSUMED;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return CANT_CONSUME;
+ }
} else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
- } else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- c->position = msg.position;
- return CANT_CONSUME;
+ }
}
}
}
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- while (true) {
- Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
-
- if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No browsable messages available for consumer " <<
- c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
- return false;
- }
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+{
+ QueuedMessage msg(this);
+ while (seek(msg, c)) {
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
@@ -366,8 +333,8 @@ bool Queue::browseNextMessage(QueuedMess
}
} else {
//consumer will never want this message, continue seeking
- QPID_LOG(debug, "Browser skipping message from '" << name << "'");
c->position = msg.position;
+ QPID_LOG(debug, "Browser skipping message from '" << name << "'");
}
}
return false;
@@ -397,71 +364,61 @@ bool Queue::dispatch(Consumer::shared_pt
}
}
-bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
-
+// Find the next message
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (messages->find(pos, msg))
+ if (messages->next(c->position, msg)) {
return true;
- return false;
+ } else {
+ listeners.addListener(c);
+ return false;
+ }
+}
+
+QueuedMessage Queue::find(SequenceNumber pos) const {
+
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- {
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
- }
- }
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
- //reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
- autoDeleteTask->cancel();
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
}
}
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
}
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- {
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
- }
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
- }
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
}
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg))
- observeAcquire(msg, locker);
+ messages->pop(msg);
return msg;
}
@@ -475,135 +432,22 @@ bool collect_if_expired(std::deque<Queue
}
}
-/**
- *@param lapse: time since the last purgeExpired
- */
-void Queue::purgeExpired(qpid::sys::Duration lapse)
+void Queue::purgeExpired()
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
- int count = dequeueSincePurge.get();
- dequeueSincePurge -= count;
- int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
- if (seconds == 0 || count / seconds < 1) {
+ //attempt is less than one per second.
+
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
+ for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
-
-namespace {
- // for use with purge/move below - collect messages that match a given filter
- //
- class MessageFilter
- {
- public:
- static const std::string typeKey;
- static const std::string paramsKey;
- static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
- virtual bool match( const QueuedMessage& ) const { return true; }
- virtual ~MessageFilter() {}
- protected:
- MessageFilter() {};
- };
- const std::string MessageFilter::typeKey("filter_type");
- const std::string MessageFilter::paramsKey("filter_params");
-
- // filter by message header string value exact match
- class HeaderMatchFilter : public MessageFilter
- {
- public:
- /* Config:
- { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "<header name>",
- 'header_value' : "<value to match>"
- }
- }
- */
- static const std::string typeKey;
- static const std::string headerKey;
- static const std::string valueKey;
- HeaderMatchFilter( const std::string& _header, const std::string& _value )
- : MessageFilter (), header(_header), value(_value) {}
- bool match( const QueuedMessage& msg ) const
- {
- const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
- if (!headers) return false;
- FieldTable::ValuePtr h = headers->get(header);
- if (!h || !h->convertsTo<std::string>()) return false;
- return h->get<std::string>() == value;
- }
- private:
- const std::string header;
- const std::string value;
- };
- const std::string HeaderMatchFilter::typeKey("header_match_str");
- const std::string HeaderMatchFilter::headerKey("header_key");
- const std::string HeaderMatchFilter::valueKey("header_value");
-
- // factory to create correct filter based on map
- MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
- {
- using namespace qpid::types;
- if (filter && !filter->empty()) {
- Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
- if (i != filter->end()) {
-
- if (i->second.asString() == HeaderMatchFilter::typeKey) {
- Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
- if (p != filter->end() && p->second.getType() == VAR_MAP) {
- Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
- Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
- if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
- std::string headerKey(k->second.asString());
- std::string value(v->second.asString());
- QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
- return new HeaderMatchFilter( headerKey, value );
- }
- }
- }
- }
- QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
- }
- return new MessageFilter();
- }
-
- // used by removeIf() to collect all messages matching a filter, maximum match count is
- // optional.
- struct Collector {
- const uint32_t maxMatches;
- MessageFilter& filter;
- std::deque<QueuedMessage> matches;
- Collector(MessageFilter& filter, uint32_t max)
- : maxMatches(max), filter(filter) {}
- bool operator() (QueuedMessage& qm)
- {
- if (maxMatches == 0 || matches.size() < maxMatches) {
- if (filter.match( qm )) {
- matches.push_back(qm);
- return true;
- }
- }
- return false;
- }
- };
-
-} // end namespace
-
-
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
@@ -615,77 +459,63 @@ namespace {
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
- *
- * An optional filter can be supplied that will be applied against each message. The
- * message is purged only if the filter matches. See MessageDistributor for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
- const qpid::types::Variant::Map *filter)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
{
- std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), purge_request);
-
Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- // Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- // now reroute if necessary
+ uint32_t purge_count = purge_request; // only comes into play if >0
+ std::deque<DeliverableMessage> rerouteQueue;
+
+ uint32_t count = 0;
+ // Either purge them all or just the some (purge_count) while the queue isn't empty.
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
- assert(qmsg->payload);
- DeliverableMessage dmsg(qmsg->payload);
- dest->routeWithAlternate(dmsg);
+ //
+ // If there is a destination exchange, stage the messages onto a reroute queue
+ // so they don't wind up getting purged more than once.
+ //
+ DeliverableMessage msg(messages->front().payload);
+ rerouteQueue.push_back(msg);
}
+ popAndDequeue();
+ count++;
}
- return c.matches.size();
-}
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
- const qpid::types::Variant::Map *filter)
-{
- std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), qty);
+ //
+ // Re-route purged messages into the destination exchange. Note that there's no need
+ // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
+ //
+ while (!rerouteQueue.empty()) {
+ DeliverableMessage msg(rerouteQueue.front());
+ rerouteQueue.pop_front();
+ dest->route(msg, msg.getMessage().getRoutingKey(),
+ msg.getMessage().getApplicationHeaders());
+ }
+
+ return count;
+}
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t count = 0; // count how many were moved for returning
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- // Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- // and move to destination Queue.
- assert(qmsg->payload);
- destq->deliver(qmsg->payload);
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
+ boost::intrusive_ptr<Message> msg = qmsg.payload;
+ destq->deliver(msg); // deliver message to the destination queue
+ pop();
+ dequeue(0, qmsg);
+ count++;
}
- return c.matches.size();
+ return count;
}
-/** Acquire the front (oldest) message from the in-memory queue.
- * assumes messageLock held by caller
- */
-void Queue::pop(const Mutex::ScopedLock& locker)
+void Queue::pop()
{
assertClusterSafe();
- QueuedMessage msg;
- if (messages->pop(msg)) {
- observeAcquire(msg, locker);
- ++dequeueSincePurge;
- }
-}
-
-/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const Mutex::ScopedLock& locker)
-{
- if (messages->remove(position, msg)) {
- observeAcquire(msg, locker);
- ++dequeueSincePurge;
- return true;
- }
- return false;
+ messages->pop();
+ ++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -694,15 +524,13 @@ void Queue::push(boost::intrusive_ptr<Me
QueuedMessage removed;
bool dequeueRequired = false;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
+ if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
+
dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired)
- observeAcquire(removed, locker);
listeners.populate(copy);
- observeEnqueue(qm, locker);
+ enqueued(qm);
}
copy.notify();
if (dequeueRequired) {
@@ -718,7 +546,7 @@ void Queue::push(boost::intrusive_ptr<Me
void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- if (message.payload->isIngressComplete()) (*result)++;
+ if (message.payload->isEnqueueComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -778,7 +606,7 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
@@ -792,21 +620,24 @@ bool Queue::enqueue(TransactionContext*
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
+ //copy on write: take deep copy of message before modifying it
+ //as the frames may already be available for delivery on other
+ //threads
+ boost::intrusive_ptr<Message> copy(new Message(*msg));
+ msg = copy;
msg->addTraceId(traceId);
}
if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
- // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
- // when it considers the message stored.
- msg->enqueueAsync(shared_from_this(), store);
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
@@ -823,10 +654,10 @@ bool Queue::enqueue(TransactionContext*
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg);
}
-// return true if store exists,
+// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
@@ -835,8 +666,8 @@ bool Queue::dequeue(TransactionContext*
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- observeDequeue(msg, locker);
+ if (!ctxt) {
+ dequeued(msg);
}
}
// This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -856,7 +687,7 @@ bool Queue::dequeue(TransactionContext*
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- observeDequeue(msg, locker);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -864,23 +695,21 @@ void Queue::dequeueCommitted(const Queue
}
/**
- * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
- * it from the logical (and persistent if applicable) queue
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue(const Mutex::ScopedLock& held)
+void Queue::popAndDequeue()
{
- if (!messages->empty()) {
- QueuedMessage msg = messages->front();
- pop(held);
- dequeue(0, msg);
- }
+ QueuedMessage msg = messages->front();
+ pop();
+ dequeue(0, msg);
}
/**
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
@@ -893,33 +722,6 @@ void Queue::observeDequeue(const QueuedM
}
}
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
- */
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
-{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->acquired(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
- }
- }
-}
-
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
- */
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
-{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->requeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
- }
- }
-}
void Queue::create(const FieldTable& _settings)
{
@@ -927,7 +729,7 @@ void Queue::create(const FieldTable& _se
if (store) {
store->create(*this, _settings);
}
- configureImpl(_settings);
+ configure(_settings);
}
@@ -940,8 +742,8 @@ int getIntegerSetting(const qpid::framin
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
+ try {
+ return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
@@ -952,45 +754,15 @@ int getIntegerSetting(const qpid::framin
}
}
-bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+void Queue::configure(const FieldTable& _settings, bool recovering)
{
- qpid::framing::FieldTable::ValuePtr v = settings.get(key);
- if (!v) {
- return false;
- } else if (v->convertsTo<int>()) {
- return v->get<int>() != 0;
- } else if (v->convertsTo<std::string>()){
- std::string s = v->get<std::string>();
- if (s == "True") return true;
- if (s == "true") return true;
- if (s == "False") return false;
- if (s == "false") return false;
- try {
- return boost::lexical_cast<bool>(s);
- } catch(const boost::bad_lexical_cast&) {
- QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
- return false;
- }
- } else {
- QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
- return false;
- }
-}
-
-void Queue::configure(const FieldTable& _settings)
-{
- settings = _settings;
- configureImpl(settings);
-}
-void Queue::configureImpl(const FieldTable& _settings)
-{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
if (eventMode && broker) {
broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
}
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
(!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -1004,43 +776,32 @@ void Queue::configureImpl(const FieldTab
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
if (broker && broker->getManagementAgent()) {
- ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
}
//set this regardless of owner to allow use of no-local with exclusive consumers also
- noLocal = getBoolSetting(_settings, qpidNoLocal);
+ noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
+ } else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
- } else { // default (FIFO) queue type
- // override default message allocator if message groups configured.
- boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
- if (mgm) {
- allocator = mgm;
- addObserver(mgm);
- }
}
}
-
- persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
+
+ persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
@@ -1048,32 +809,32 @@ void Queue::configureImpl(const FieldTab
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
- if (mgmtObject != 0) {
+ if (mgmtObject != 0)
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
- }
- QueueFlowLimit::observe(*this, _settings);
+ if ( isDurable() && ! getPersistenceId() && ! recovering )
+ store->create(*this, _settings);
}
-void Queue::destroyed()
+void Queue::destroy()
{
- unbind(broker->getExchanges());
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
- alternateExchange->routeWithAlternate(msg);
- popAndDequeue(locker);
+ alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+ msg.getMessage().getApplicationHeaders());
+ popAndDequeue();
}
alternateExchange->decAlternateUsers();
}
@@ -1085,7 +846,6 @@ void Queue::destroyed()
store = 0;//ensure we make no more calls to the store for this queue
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
- notifyDeleted();
}
void Queue::notifyDeleted()
@@ -1105,9 +865,9 @@ void Queue::bound(const string& exchange
bindings.add(exchange, key, args);
}
-void Queue::unbind(ExchangeRegistry& exchanges)
+void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
{
- bindings.unbind(exchanges, shared_from_this());
+ bindings.unbind(exchanges, shared_ref);
}
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -1120,9 +880,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -1136,11 +896,11 @@ void Queue::setPersistenceId(uint64_t _p
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -1154,14 +914,13 @@ uint32_t Queue::encodedSize() const
+ (policy.get() ? (*policy).encodedSize() : 0);
}
-Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
+Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool recovering )
{
string name;
buffer.getShortString(name);
- FieldTable settings;
- buffer.get(settings);
- boost::shared_ptr<Exchange> alternate;
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ buffer.get(result.first->settings);
+ result.first->configure(result.first->settings, recovering );
if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
buffer.get ( *(result.first->policy) );
}
@@ -1193,10 +952,11 @@ boost::shared_ptr<Exchange> Queue::getAl
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->destroyed();
+ queue->unbind(broker.getExchanges(), queue);
+ queue->destroy();
}
}
@@ -1205,7 +965,7 @@ struct AutoDeleteTask : qpid::sys::Timer
Broker& broker;
Queue::shared_ptr queue;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
void fire()
@@ -1223,27 +983,27 @@ void Queue::tryAutoDelete(Broker& broker
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue);
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -1252,25 +1012,25 @@ bool Queue::setExclusiveOwner(const Owne
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
@@ -1295,7 +1055,7 @@ Manageable::status_t Queue::ManagementMe
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
- purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
+ purge(purgeArgs.i_request);
status = Manageable::STATUS_OK;
}
break;
@@ -1316,7 +1076,7 @@ Manageable::status_t Queue::ManagementMe
}
}
- purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
+ purge(rerouteArgs.i_request, dest);
status = Manageable::STATUS_OK;
}
break;
@@ -1325,14 +1085,6 @@ Manageable::status_t Queue::ManagementMe
return status;
}
-
-void Queue::query(qpid::types::Variant::Map& results) const
-{
- Mutex::ScopedLock locker(messageLock);
- /** @todo add any interesting queue state into results */
- if (allocator) allocator->query(results);
-}
-
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
@@ -1367,10 +1119,7 @@ void Queue::insertSequenceNumbers(const
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
- */
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::enqueued(const QueuedMessage& m)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1393,8 +1142,7 @@ void Queue::updateEnqueued(const QueuedM
if (policy.get()) {
policy->recoverEnqueued(payload);
}
- Mutex::ScopedLock locker(messageLock);
- observeEnqueue(m, locker);
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1418,7 +1166,6 @@ void Queue::checkNotDeleted()
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
- Mutex::ScopedLock locker(messageLock);
observers.insert(observer);
}
@@ -1428,32 +1175,6 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
-
-bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments)
-{
- if (exchange->bind(shared_from_this(), key, &arguments)) {
- bound(exchange->getName(), key, arguments);
- if (exchange->isDurable() && isDurable()) {
- store->bind(*exchange, *this, key, arguments);
- }
- return true;
- } else {
- return false;
- }
-}
-
-
-const Broker* Queue::getBroker()
-{
- return broker;
-}
-
-void Queue::setDequeueSincePurge(uint32_t value) {
- dequeueSincePurge = value;
-}
-
-
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Queue.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
@@ -59,7 +59,7 @@ class MessageStore;
class QueueEvents;
class QueueRegistry;
class TransactionContext;
-class MessageDistributor;
+class Exchange;
/**
* The brokers representation of an amqp queue. Messages are
@@ -74,13 +74,13 @@ class Queue : public boost::enable_share
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -88,7 +88,7 @@ class Queue : public boost::enable_share
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -119,7 +119,7 @@ class Queue : public boost::enable_share
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
+ RateTracker dequeueTracker;
int eventMode;
Observers observers;
bool insertSeqNo;
@@ -129,36 +129,26 @@ class Queue : public boost::enable_share
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- boost::shared_ptr<MessageDistributor> allocator;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
void notifyListener();
void removeListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes. Lock
- * must be held by caller */
- void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-
- /** modify the Queue's message container - assumes messageLock held */
- void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
- void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
- // acquire message @ position, return true and set msg if acquire succeeds
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const sys::Mutex::ScopedLock& held);
-
+ void enqueued(const QueuedMessage& msg);
+ void dequeued(const QueuedMessage& msg);
+ void pop();
+ void popAndDequeue();
+ QueuedMessage getFront();
void forcePersistent(QueuedMessage& msg);
int getEventMode();
- void configureImpl(const qpid::framing::FieldTable& settings);
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -182,9 +172,8 @@ class Queue : public boost::enable_share
}
}
}
-
+
void checkNotDeleted();
- void notifyDeleted();
public:
@@ -193,50 +182,29 @@ class Queue : public boost::enable_share
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
- /** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
- /** allow the Consumer to acquire a message that it has browsed.
- * @param msg - message to be acquired.
- * @return false if message is no longer available for acquire.
- */
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
-
- /**
- * Used to configure a new queue and create a persistent record
- * for it in store if required.
- */
- QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+ void create(const qpid::framing::FieldTable& settings);
- /**
- * Used to reconfigure a recovered queue (does not create
- * persistent record in store).
- */
- QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
- void destroyed();
+ // "recovering" means we are doing a MessageStore recovery.
+ QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
+ bool recovering = false);
+ void destroy();
+ void notifyDeleted();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
const qpid::framing::FieldTable& args);
- //TODO: get unbind out of the public interface; only there for purposes of one unit test
- QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
- /**
- * Bind self to specified exchange, and record that binding for unbinding on delete.
- */
- bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+ QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
+ Queue::shared_ptr shared_ref);
- /** Acquire the message at the given position if it is available for acquire. Not to
- * be used by clients, but used by the broker for queue management.
- * @param message - set to the acquired message if true returned.
- * @return true if the message has been acquired.
- */
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
@@ -265,14 +233,11 @@ class Queue : public boost::enable_share
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
- boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
- const ::qpid::types::Variant::Map *filter=0);
- QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
- const qpid::types::Variant::Map *filter=0);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -311,8 +276,8 @@ class Queue : public boost::enable_share
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -323,14 +288,14 @@ class Queue : public boost::enable_share
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Acquires the next available (oldest) message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
- /** Get the message at position pos, returns true if found and sets msg */
- QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
+ /** Get the message at position pos */
+ QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
const QueuePolicy* getPolicy();
@@ -344,13 +309,8 @@ class Queue : public boost::enable_share
void encode(framing::Buffer& buffer) const;
uint32_t encodedSize() const;
- /**
- * Restores a queue from encoded data (used in recovery)
- *
- * Note: restored queue will be neither auto-deleted or have an
- * exclusive owner
- */
- static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
+ // "recovering" means we are doing a MessageStore recovery.
+ static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -359,7 +319,6 @@ class Queue : public boost::enable_share
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
@@ -372,11 +331,6 @@ class Queue : public boost::enable_share
bindings.eachBinding(f);
}
- /** Apply f to each Observer on the queue */
- template <class F> void eachObserver(F f) {
- std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
- }
-
/** Set the position sequence number for the next message on the queue.
* Must be >= the current sequence number.
* Used by cluster to replicate queues.
@@ -404,11 +358,6 @@ class Queue : public boost::enable_share
void recoverPrepared(boost::intrusive_ptr<Message>& msg);
void flush();
-
- const Broker* getBroker();
-
- uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
- void setDequeueSincePurge(uint32_t value);
};
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
QueueCleaner::~QueueCleaner()
{
@@ -36,16 +36,10 @@ QueueCleaner::~QueueCleaner()
void QueueCleaner::start(qpid::sys::Duration p)
{
- period = p;
task = new Task(*this, p);
- timer->add(task);
+ timer.add(task);
}
-void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
- this->timer = timer;
-}
-
-
QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
void QueueCleaner::Task::fire()
@@ -71,9 +65,9 @@ void QueueCleaner::fired()
std::vector<Queue::shared_ptr> copy;
CollectQueues collect(©);
queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
+ std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
task->setupNextFire();
- timer->add(task);
+ timer.add(task);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueCleaner.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,15 +35,14 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
QPID_BROKER_EXTERN ~QueueCleaner();
- QPID_BROKER_EXTERN void start(sys::Duration period);
- QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
+ QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
private:
class Task : public sys::TimerTask
{
public:
- Task(QueueCleaner& parent, sys::Duration duration);
+ Task(QueueCleaner& parent, qpid::sys::Duration duration);
void fire();
private:
QueueCleaner& parent;
@@ -51,8 +50,7 @@ class QueueCleaner
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- sys::Timer* timer;
- sys::Duration period;
+ sys::Timer& timer;
void fired();
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueEvents.cpp Fri Oct 21 01:19:00 2011
@@ -129,10 +129,6 @@ class EventGenerator : public QueueObser
{
if (!enqueueOnly) manager.dequeued(m);
}
-
- void acquired(const QueuedMessage&) {};
- void requeued(const QueuedMessage&) {};
-
private:
QueueEvents& manager;
const bool enqueueOnly;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.cpp Fri Oct 21 01:19:00 2011
@@ -26,25 +26,19 @@ namespace broker {
void QueueListeners::addListener(Consumer::shared_ptr c)
{
- if (!c->inListeners) {
- if (c->acquires) {
- add(consumers, c);
- } else {
- add(browsers, c);
- }
- c->inListeners = true;
+ if (c->preAcquires()) {
+ add(consumers, c);
+ } else {
+ add(browsers, c);
}
}
void QueueListeners::removeListener(Consumer::shared_ptr c)
{
- if (c->inListeners) {
- if (c->acquires) {
- remove(consumers, c);
- } else {
- remove(browsers, c);
- }
- c->inListeners = false;
+ if (c->preAcquires()) {
+ remove(consumers, c);
+ } else {
+ remove(browsers, c);
}
}
@@ -52,20 +46,18 @@ void QueueListeners::populate(Notificati
{
if (consumers.size()) {
set.consumer = consumers.front();
- consumers.pop_front();
- set.consumer->inListeners = false;
+ consumers.erase(consumers.begin());
} else {
- // Don't swap the deques, hang on to the memory allocated.
+ // Don't swap the vectors, hang on to the memory allocated.
set.browsers = browsers;
browsers.clear();
- for (Listeners::iterator i = set.browsers.begin(); i != set.browsers.end(); i++)
- (*i)->inListeners = false;
}
}
void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
{
- listeners.push_back(c);
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i == listeners.end()) listeners.push_back(c);
}
void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
@@ -81,7 +73,9 @@ void QueueListeners::NotificationSet::no
}
bool QueueListeners::contains(Consumer::shared_ptr c) const {
- return c->inListeners;
+ return
+ std::find(browsers.begin(), browsers.end(), c) != browsers.end() ||
+ std::find(consumers.begin(), consumers.end(), c) != consumers.end();
}
void QueueListeners::ListenerSet::notifyAll()
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueListeners.h Fri Oct 21 01:19:00 2011
@@ -22,7 +22,7 @@
*
*/
#include "qpid/broker/Consumer.h"
-#include <deque>
+#include <vector>
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ namespace broker {
class QueueListeners
{
public:
- typedef std::deque<Consumer::shared_ptr> Listeners;
+ typedef std::vector<Consumer::shared_ptr> Listeners;
class NotificationSet
{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueObserver.h Fri Oct 21 01:19:00 2011
@@ -24,52 +24,18 @@
namespace qpid {
namespace broker {
-struct QueuedMessage;
-class Consumer;
-
+class QueuedMessage;
/**
- * Interface for notifying classes who want to act as 'observers' of a queue of particular
- * events.
- *
- * The events that are monitored reflect the relationship between a particular message and
- * the queue it has been delivered to. A message can be considered in one of three states
- * with respect to the queue:
- *
- * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
- *
- * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
- * (by either browse or acquire), but still considered on the queue.
- *
- * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
- *
- * The queue events that are observable are:
- *
- * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
- * (e.g. browse or acquire)
- *
- * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
- * for other consumers to browse or acquire, but it is not yet considered dequeued as it
- * may be requeued by the consumer.
- *
- * "Requeued" - a previously-acquired message is released by its owner: it is put back on
- * the queue at its original position and returns to the "Available" state.
- *
- * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks
- * the message, and the broker considers the consumer's transaction complete.
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
*/
class QueueObserver
{
public:
virtual ~QueueObserver() {}
-
- // note: the Queue will hold the messageLock while calling these methods!
virtual void enqueued(const QueuedMessage&) = 0;
virtual void dequeued(const QueuedMessage&) = 0;
- virtual void acquired(const QueuedMessage&) = 0;
- virtual void requeued(const QueuedMessage&) = 0;
- virtual void consumerAdded( const Consumer& ) {};
- virtual void consumerRemoved( const Consumer& ) {};
- private:
+ private:
};
}} // namespace qpid::broker
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.cpp Fri Oct 21 01:19:00 2011
@@ -117,30 +117,30 @@ void QueuePolicy::update(FieldTable& set
settings.setString(typeKey, type);
}
-template <typename T>
-T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
+uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
{
FieldTable::ValuePtr v = settings.get(key);
- T result = 0;
+ int32_t result = 0;
if (!v) return defaultValue;
if (v->getType() == 0x23) {
QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
} else if (v->getType() == 0x33) {
QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<T>()) {
- result = v->get<T>();
+ } else if (v->convertsTo<int>()) {
+ result = v->get<int>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
} else if (v->convertsTo<string>()) {
string s(v->get<string>());
QPID_LOG(debug, "Got string value for " << key << ": " << s);
std::istringstream convert(s);
- if (convert >> result && result >= 0 && convert.eof()) return result;
+ if (convert >> result && result >= 0) return result;
}
- throw IllegalArgumentException(QPID_MSG("Cannot convert " << key << " to unsigned integer: " << *v));
+ QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
+ return defaultValue;
}
std::string QueuePolicy::getType(const FieldTable& settings)
@@ -247,7 +247,7 @@ bool RingQueuePolicy::checkLimit(boost::
{
// If the message is bigger than the queue size, give up
- if (getMaxSize() && m->contentSize() > getMaxSize()) {
+ if (m->contentSize() > getMaxSize()) {
QPID_LOG(debug, "Message too large for ring queue " << name
<< " [" << *this << "] "
<< ": message size = " << m->contentSize() << " bytes"
@@ -269,7 +269,8 @@ bool RingQueuePolicy::checkLimit(boost::
do {
QueuedMessage oldest = queue.front();
- if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
+
+ if (oldest.queue->acquire(oldest) || !strict) {
queue.pop_front();
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
@@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
{
- uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
- uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
+ uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
+ uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueuePolicy.h Fri Oct 21 01:19:00 2011
@@ -43,7 +43,8 @@ class QueuePolicy
uint32_t count;
uint64_t size;
bool policyExceeded;
-
+
+ static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
protected:
uint64_t getCurrentQueueSize() const { return size; }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.cpp Fri Oct 21 01:19:00 2011
@@ -21,7 +21,6 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueEvents.h"
-#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
#include <sstream>
#include <assert.h>
@@ -37,13 +36,7 @@ QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const OwnershipToken* owner,
- boost::shared_ptr<Exchange> alternate,
- const qpid::framing::FieldTable& arguments,
- bool recovering/*true if this declare is a
- result of recovering queue
- definition from persistente
- record*/)
+ bool autoDelete, const OwnershipToken* owner)
{
RWlock::ScopedWlock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
@@ -52,17 +45,6 @@ QueueRegistry::declare(const string& dec
if (i == queues.end()) {
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
- if (alternate) {
- queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
- if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
- } else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
- }
queues[name] = queue;
if (lastNode) queue->setLastNodeFailure();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/QueueRegistry.h Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
-#include "qpid/framing/FieldTable.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <algorithm>
@@ -35,7 +34,6 @@ namespace broker {
class Queue;
class QueueEvents;
-class Exchange;
class OwnershipToken;
class Broker;
class MessageStore;
@@ -62,10 +60,7 @@ class QueueRegistry {
const std::string& name,
bool durable = false,
bool autodelete = false,
- const OwnershipToken* owner = 0,
- boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
- const qpid::framing::FieldTable& args = framing::FieldTable(),
- bool recovering = false);
+ const OwnershipToken* owner = 0);
/**
* Destroy the named queue.
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredDequeue.cpp Fri Oct 21 01:19:00 2011
@@ -43,6 +43,7 @@ void RecoveredDequeue::commit() throw()
void RecoveredDequeue::rollback() throw()
{
+ msg->enqueueComplete();
queue->process(msg);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveredEnqueue.cpp Fri Oct 21 01:19:00 2011
@@ -36,6 +36,7 @@ bool RecoveredEnqueue::prepare(Transacti
}
void RecoveredEnqueue::commit() throw(){
+ msg->enqueueComplete();
queue->process(msg);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Oct 21 01:19:00 2011
@@ -113,7 +113,7 @@ RecoverableExchange::shared_ptr Recovery
RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
{
- Queue::shared_ptr queue = Queue::restore(queues, buffer);
+ Queue::shared_ptr queue = Queue::decode(queues, buffer, true);
try {
Exchange::shared_ptr exchange = exchanges.getDefault();
if (exchange) {
@@ -252,6 +252,7 @@ void RecoverableMessageImpl::dequeue(Dtx
void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
+ msg->enqueueComplete(); // recoved nmessage to enqueued in store already
buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.cpp Fri Oct 21 01:19:00 2011
@@ -30,7 +30,6 @@
#include <boost/format.hpp>
#if HAVE_SASL
-#include <sys/stat.h>
#include <sasl/sasl.h>
#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
using qpid::sys::cyrus::CyrusSecurityLayer;
@@ -58,7 +57,7 @@ public:
NullAuthenticator(Connection& connection, bool encrypt);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string* response);
+ void start(const std::string& mechanism, const std::string& response);
void step(const std::string&) {}
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -82,7 +81,7 @@ public:
~CyrusAuthenticator();
void init();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string* response);
+ void start(const std::string& mechanism, const std::string& response);
void step(const std::string& response);
void getError(std::string& error);
void getUid(std::string& uid) { getUsername(uid); }
@@ -99,33 +98,11 @@ void SaslAuthenticator::init(const std::
// Check if we have a version of SASL that supports sasl_set_path()
#if (SASL_VERSION_FULL >= ((2<<16)|(1<<8)|22))
// If we are not given a sasl path, do nothing and allow the default to be used.
- if ( saslConfigPath.empty() ) {
- QPID_LOG ( info, "SASL: no config path set - using default." );
- }
- else {
- struct stat st;
-
- // Make sure the directory exists and we can read up to it.
- if ( ::stat ( saslConfigPath.c_str(), & st) ) {
- // Note: not using strerror() here because I think its messages are a little too hazy.
- if ( errno == ENOENT )
- throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: no such directory: " << saslConfigPath ) );
- if ( errno == EACCES )
- throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot read parent of: " << saslConfigPath ) );
- // catch-all stat failure
- throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot stat: " << saslConfigPath ) );
- }
-
- // Make sure the directory is readable.
- if ( ::access ( saslConfigPath.c_str(), R_OK ) ) {
- throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: directory not readable:" << saslConfigPath ) );
- }
-
- // This shouldn't fail now, but check anyway.
- int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, const_cast<char *>(saslConfigPath.c_str()));
+ if ( ! saslConfigPath.empty() ) {
+ int code = sasl_set_path(SASL_PATH_TYPE_CONFIG,
+ const_cast<char *>(saslConfigPath.c_str()));
if(SASL_OK != code)
throw Exception(QPID_MSG("SASL: sasl_set_path failed [" << code << "] " ));
-
QPID_LOG(info, "SASL: config path set to " << saslConfigPath );
}
#endif
@@ -187,7 +164,7 @@ void NullAuthenticator::getMechanisms(Ar
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
}
-void NullAuthenticator::start(const string& mechanism, const string* response)
+void NullAuthenticator::start(const string& mechanism, const string& response)
{
if (encrypt) {
#if HAVE_SASL
@@ -203,16 +180,16 @@ void NullAuthenticator::start(const stri
}
}
if (mechanism == "PLAIN") { // Old behavior
- if (response && response->size() > 0) {
+ if (response.size() > 0) {
string uid;
- string::size_type i = response->find((char)0);
- if (i == 0 && response->size() > 1) {
+ string::size_type i = response.find((char)0);
+ if (i == 0 && response.size() > 1) {
//no authorization id; use authentication id
- i = response->find((char)0, 1);
- if (i != string::npos) uid = response->substr(1, i-1);
+ i = response.find((char)0, 1);
+ if (i != string::npos) uid = response.substr(1, i-1);
} else if (i != string::npos) {
//authorization id is first null delimited field
- uid = response->substr(0, i);
+ uid = response.substr(0, i);
}//else not a valid SASL PLAIN response, throw error?
if (!uid.empty()) {
//append realm if it has not already been added
@@ -399,22 +376,18 @@ void CyrusAuthenticator::getMechanisms(A
}
}
-void CyrusAuthenticator::start(const string& mechanism, const string* response)
+void CyrusAuthenticator::start(const string& mechanism, const string& response)
{
const char *challenge;
unsigned int challenge_len;
- // This should be at same debug level as mech list in getMechanisms().
- QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
+ QPID_LOG(debug, "SASL: Starting authentication with mechanism: " << mechanism);
int code = sasl_server_start(sasl_conn,
mechanism.c_str(),
- (response ? response->c_str() : 0), (response ? response->size() : 0),
+ response.c_str(), response.length(),
&challenge, &challenge_len);
processAuthenticationStep(code, challenge, challenge_len);
- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
- cnxMgmt->set_saslMechanism(mechanism);
}
void CyrusAuthenticator::step(const string& response)
@@ -451,12 +424,10 @@ void CyrusAuthenticator::processAuthenti
client.secure(challenge_str);
} else {
std::string uid;
- //save error detail before trying to retrieve username as error in doing so will overwrite it
- std::string errordetail = sasl_errdetail(sasl_conn);
if (!getUsername(uid)) {
- QPID_LOG(info, "SASL: Authentication failed (no username available yet):" << errordetail);
+ QPID_LOG(info, "SASL: Authentication failed (no username available):" << sasl_errdetail(sasl_conn));
} else {
- QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << errordetail);
+ QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << sasl_errdetail(sasl_conn));
}
// TODO: Change to more specific exceptions, when they are
@@ -488,9 +459,6 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
if (ssf) {
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
- cnxMgmt->set_saslSsf(ssf);
return securityLayer;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SaslAuthenticator.h Fri Oct 21 01:19:00 2011
@@ -41,7 +41,7 @@ class SaslAuthenticator
public:
virtual ~SaslAuthenticator() {}
virtual void getMechanisms(framing::Array& mechanisms) = 0;
- virtual void start(const std::string& mechanism, const std::string* response) = 0;
+ virtual void start(const std::string& mechanism, const std::string& response) = 0;
virtual void step(const std::string& response) = 0;
virtual void getUid(std::string&) {}
virtual bool getUsername(std::string&) { return false; };
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org