You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/06/23 12:46:16 UTC
svn commit: r787625 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp
Queue.h QueuePolicy.cpp QueuePolicy.h
Author: gsim
Date: Tue Jun 23 10:46:15 2009
New Revision: 787625
URL: http://svn.apache.org/viewvc?rev=787625&view=rev
Log:
QPID-1936: Fix potential deadlock for durable ring queue
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 23 10:46:15 2009
@@ -551,11 +551,16 @@
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+ Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) policy->tryEnqueue(qm);
+ if (policy.get()) {
+ policy->tryEnqueue(qm);
+ //depending on policy, may have some dequeues
+ if (!isRecovery) pendingDequeues.swap(dequeues);
+ }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -591,6 +596,10 @@
}
}
copy.notify();
+ if (!dequeues.empty()) {
+ //depending on policy, may have some dequeues
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
}
QueuedMessage Queue::getFront()
@@ -1026,4 +1035,10 @@
return !policy.get() || policy->isEnqueued(msg);
}
+void Queue::addPendingDequeue(const QueuedMessage& msg)
+{
+ //assumes lock is held - true at present but rather nasty as this is a public method
+ pendingDequeues.push_back(msg);
+}
+
QueueListeners& Queue::getListeners() { return listeners; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 23 10:46:15 2009
@@ -326,6 +326,18 @@
*/
void recoveryComplete();
+ /**
+ * This is a hack to avoid deadlocks in durable ring
+ * queues. It is used for dequeueing messages in response
+ * to an enqueue while avoid holding lock over call to
+ * store.
+ *
+ * Assumes messageLock is held - true for curent use case
+ * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
+ * method
+ **/
+ void addPendingDequeue(const QueuedMessage &msg);
+
// For cluster update
QueueListeners& getListeners();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Jun 23 10:46:15 2009
@@ -207,13 +207,9 @@
{
qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
- for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
- if (i->payload == m.payload) {
- queue.erase(i);
- //now update count and size
- QueuePolicy::dequeued(m);
- break;
- }
+ if (find(m, pendingDequeues, true) || find(m, queue, true)) {
+ //now update count and size
+ QueuePolicy::dequeued(m);
}
}
@@ -223,12 +219,7 @@
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
- for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
- if (i->payload == m.payload) {
- return true;
- }
- }
- return false;
+ return find(m, pendingDequeues, false) || find(m, queue, false);
}
bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
@@ -248,7 +239,18 @@
oldest = queue.front();
}
if (oldest.queue->acquire(oldest) || !strict) {
- oldest.queue->dequeue(0, oldest);
+ {
+ //TODO: fix this! In the current code, this method is
+ //only ever called with the Queue lock already taken. This
+ //should not be relied upon going forward however and
+ //clearly the locking in this class is insufficient as
+ //there is no guarantee that the message previously atthe
+ //front is still there.
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ }
+ oldest.queue->addPendingDequeue(oldest);
QPID_LOG(debug, "Ring policy triggered in queue "
<< (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": removed message " << oldest.position << " to make way for " << m.position);
@@ -264,6 +266,17 @@
}
}
+bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
+{
+ for (Messages::iterator i = q.begin(); i != q.end(); i++) {
+ if (i->payload == m.payload) {
+ if (remove) q.erase(i);
+ return true;
+ }
+ }
+ return false;
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
uint32_t maxCount = getInt(settings, maxCountKey, 0);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Tue Jun 23 10:46:15 2009
@@ -101,8 +101,11 @@
private:
typedef std::deque<QueuedMessage> Messages;
qpid::sys::Mutex lock;
+ Messages pendingDequeues;
Messages queue;
const bool strict;
+
+ bool find(const QueuedMessage&, Messages&, bool remove);
};
}}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org