You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/28 14:12:42 UTC
svn commit: r819505 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp
qpid/broker/Queue.h qpid/broker/QueuePolicy.cpp qpid/broker/QueuePolicy.h
tests/ring_queue_test
Author: gsim
Date: Mon Sep 28 12:12:41 2009
New Revision: 819505
URL: http://svn.apache.org/viewvc?rev=819505&view=rev
Log:
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes)
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
qpid/trunk/qpid/cpp/src/tests/ring_queue_test
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Sep 28 12:12:41 2009
@@ -208,11 +208,10 @@
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -603,7 +602,6 @@
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
if (policy.get()) {
- Mutex::ScopedUnlock locker(messageLock);
policy->enqueued(qm);
}
}
@@ -696,7 +694,14 @@
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
if (policy.get() && !suppressPolicyCheck) {
- policy->tryEnqueue(msg);
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
@@ -1072,10 +1077,4 @@
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Sep 28 12:12:41 2009
@@ -334,18 +334,6 @@
*/
void recoveryComplete();
- /**
- * This is a hack to avoid deadlocks in durable ring
- * queues. It is used for dequeueing messages in response
- * to an enqueue while avoid holding lock over call to
- * store.
- *
- * Assumes messageLock is held - true for curent use case
- * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
- * method
- **/
- void addPendingDequeue(const QueuedMessage &msg);
-
// For cluster update
QueueListeners& getListeners();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Mon Sep 28 12:12:41 2009
@@ -77,7 +77,6 @@
void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (checkLimit(m)) {
enqueued(m->contentSize());
} else {
@@ -87,13 +86,11 @@
void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
enqueued(m->contentSize());
}
void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m->contentSize());
}
@@ -101,7 +98,6 @@
void QueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m.payload->contentSize());
}
@@ -141,6 +137,7 @@
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -200,14 +197,12 @@
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -217,7 +212,6 @@
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
@@ -241,8 +235,6 @@
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
<< ": removed message " << oldest.position << " to make way for new message");
- qpid::sys::Mutex::ScopedUnlock u(lock);
- oldest.queue->dequeue(0, oldest);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in " << name
@@ -254,6 +246,11 @@
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Mon Sep 28 12:12:41 2009
@@ -47,6 +47,7 @@
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
public:
+ typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
@@ -68,7 +69,7 @@
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
-
+ virtual void getPendingDequeues(Messages& result);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
@@ -80,7 +81,6 @@
const QueuePolicy&);
protected:
const std::string name;
- qpid::sys::Mutex lock;
QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
@@ -105,8 +105,8 @@
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
private:
- typedef std::deque<QueuedMessage> Messages;
Messages pendingDequeues;
Messages queue;
const bool strict;
Modified: qpid/trunk/qpid/cpp/src/tests/ring_queue_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ring_queue_test?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ring_queue_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ring_queue_test Mon Sep 28 12:12:41 2009
@@ -48,7 +48,7 @@
cleanup() {
rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_*
- qpid-config $BROKER_URL add queue $QUEUE_NAME
+ qpid-config $BROKER_URL del queue $QUEUE_NAME --force
}
log() {
@@ -64,10 +64,11 @@
if [[ $RECEIVERS -eq 0 ]]; then
#queue should have $LIMIT messages on it, but need to send an eos also
sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null
- if [[ $(receiver --queue $QUEUE_NAME --browse | wc -l) -eq $(( $LIMIT - 1)) ]]; then
+ received=$(receiver --queue $QUEUE_NAME --browse | wc -l)
+ if [[ received -eq $(( $LIMIT - 1)) ]]; then
log "queue contains $LIMIT messages as expected"
else
- fail "queue does not contain the expected $LIMIT messages"
+ fail "queue does not contain the expected $LIMIT messages (received $received)"
fi
elif [[ $CONCURRENT -eq 0 ]]; then
#sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org