You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/31 20:43:09 UTC
svn commit: r1441158 - in /qpid/trunk/qpid/cpp/src/qpid/ha: QueueGuard.cpp
QueueGuard.h ReplicatingSubscription.h
Author: aconway
Date: Thu Jan 31 19:43:09 2013
New Revision: 1441158
URL: http://svn.apache.org/viewvc?rev=1441158&view=rev
Log:
QPID-4555: HA Fix race condition in QueueGuard
- If cancelled could delay a message without recording it.
- Make all actions involving the delayed set and the AsyncCompletion atomic.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Jan 31 19:43:09 2013
@@ -67,44 +67,30 @@ QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const Message& m) {
// Delay completion
QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return; // Don't record enqueues after we are cancelled.
- assert(delayed.find(m.getSequence()) == delayed.end());
- delayed[m.getSequence()] = m.getIngressCompletion();
- }
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
QPID_LOG(trace, logPrefix << "Dequeued " << m);
- ReplicatingSubscription* rs=0;
- {
- Mutex::ScopedLock l(lock);
- rs = subscription;
- }
- if (rs) rs->dequeued(m);
- complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
- for (Delayed::iterator i = begin; i != end; ++i) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
- i->second->finishCompleter();
- }
+ Mutex::ScopedLock l(lock);
+ if (subscription) subscription->dequeued(m);
+ complete(m.getSequence(), l);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return;
- cancelled = true;
- delayed.swap(removed);
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -115,37 +101,34 @@ void QueueGuard::attach(ReplicatingSubsc
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start position.
// Those messages are already on the backup.
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- Delayed::iterator i = delayed.begin();
- while(i != delayed.end() && i->first <= position) {
- removed.insert(*i);
- delayed.erase(i++);
- }
+ Mutex::ScopedLock l(lock);
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
return position >= range.back;
}
void QueueGuard::complete(SequenceNumber sequence) {
- boost::intrusive_ptr<broker::AsyncCompletion> m;
- {
- Mutex::ScopedLock l(lock);
- // The same message can be completed twice, by
- // ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
- if (i != delayed.end()) {
- m = i->second;
- delayed.erase(i);
- }
+ Mutex::ScopedLock l(lock);
+ complete(sequence, l);
+}
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ complete(i, l);
+ delayed.erase(i);
}
- if (m) {
- QPID_LOG(trace, logPrefix << "Completed " << sequence);
- m->finishCompleter();
- }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
}
}} // namespaces qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Thu Jan 31 19:43:09 2013
@@ -54,6 +54,9 @@ class ReplicatingSubscription;
* THREAD SAFE: Concurrent calls:
* - enqueued() via QueueObserver in arbitrary connection threads.
* - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class QueueGuard {
public:
@@ -104,18 +107,20 @@ class QueueGuard {
private:
class QueueObserver;
+ typedef std::map<framing::SequenceNumber,
+ boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+
+ void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+ void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
sys::Mutex lock;
bool cancelled;
std::string logPrefix;
broker::Queue& queue;
- typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
-
- void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jan 31 19:43:09 2013
@@ -61,6 +61,8 @@ class QueueGuard;
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org