You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2018/10/17 23:01:00 UTC
qpid-cpp git commit: QPID-8209: race in queue deletion causes
infinite recursion in autodelete
Repository: qpid-cpp
Updated Branches:
refs/heads/master 0ba019b73 -> f20220f70
QPID-8209: race in queue deletion causes infinite recursion in autodelete
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/f20220f7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/f20220f7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/f20220f7
Branch: refs/heads/master
Commit: f20220f70fcaf8c6b7efb6e24b35b033aaba5a32
Parents: 0ba019b
Author: Cliff Jansen <cl...@apache.org>
Authored: Wed Oct 17 15:55:49 2018 -0700
Committer: Cliff Jansen <cl...@apache.org>
Committed: Wed Oct 17 15:55:49 2018 -0700
----------------------------------------------------------------------
src/qpid/broker/Broker.cpp | 52 +++++++++++++++++-------------
src/qpid/broker/Broker.h | 6 ++++
src/qpid/broker/Queue.cpp | 24 ++++++--------
src/qpid/broker/Queue.h | 3 +-
src/qpid/broker/QueueRegistry.cpp | 24 +++++++++-----
src/qpid/broker/QueueRegistry.h | 4 +--
src/qpid/broker/SelfDestructQueue.cpp | 2 +-
src/qpid/broker/amqp/Session.cpp | 2 +-
src/tests/QueueRegistryTest.cpp | 2 +-
9 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Broker.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index 272219c..d214994 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -1473,36 +1473,44 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
return result;
}
-void Broker::deleteQueue(const std::string& name, const std::string& userId,
+void Broker::deleteQueue(boost::shared_ptr<Queue> queue, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
+ const std::string& name = queue->getName();
QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
);
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ boost::shared_ptr<Exchange> altEx = queue->getAlternateExchange();
+ params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
+ params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
+
+ if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,¶ms) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+ }
+ if (check) check(queue);
+ if (acl)
+ acl->recordDestroyQueue(name);
+ Queue::shared_ptr peerQ(queue->getRedirectPeer());
+ if (peerQ)
+ queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
+ queue->isRedirectSource() ? peerQ : queue,
+ false);
+ queues.destroy(queue, connectionId, userId);
+
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+ const std::string& connectionId, QueueFunctor check)
+{
Queue::shared_ptr queue = queues.find(name);
if (queue) {
- if (acl) {
- std::map<acl::Property, std::string> params;
- boost::shared_ptr<Exchange> altEx = queue->getAlternateExchange();
- params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
- params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
-
- if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,¶ms) )
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
- }
- if (check) check(queue);
- if (acl)
- acl->recordDestroyQueue(name);
- Queue::shared_ptr peerQ(queue->getRedirectPeer());
- if (peerQ)
- queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
- queue->isRedirectSource() ? peerQ : queue,
- false);
- queues.destroy(name, connectionId, userId);
+ deleteQueue(queue, userId, connectionId, check);
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Broker.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index 3df28b0..6a7cc8b 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -279,6 +279,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
const std::string& userId,
const std::string& connectionId);
+ void deleteQueue(
+ boost::shared_ptr<Queue> queue,
+ const std::string& userId,
+ const std::string& connectionId,
+ QueueFunctor check = QueueFunctor());
+
QPID_BROKER_EXTERN void deleteQueue(
const std::string& name,
const std::string& userId,
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Queue.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 858e874..3b54661 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -363,7 +363,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered)
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- if (!deleted) {
+ if (!isDeleted()) {
Message* message = messages->release(position);
if (message) {
if (!markRedelivered) message->undeliver();
@@ -873,7 +873,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
- return !deleted && checkAutoDelete(locker);
+ return !isDeleted() && checkAutoDelete(locker);
}
bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const
@@ -1191,7 +1191,6 @@ void Queue::notifyDeleted()
QueueListeners::ListenerSet set;
{
Mutex::ScopedLock locker(messageLock);
- deleted = true;
listeners.snapshot(set);
}
set.notifyAll();
@@ -1346,17 +1345,13 @@ void Queue::tryAutoDelete(long expectedVersion)
bool proceed(false);
{
Mutex::ScopedLock locker(messageLock);
- if (!deleted && checkAutoDelete(locker)) {
+ if (!isDeleted() && checkAutoDelete(locker)) {
proceed = true;
}
}
if (proceed) {
- if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) {
- {
- Mutex::ScopedLock locker(messageLock);
- deleted = true;
- }
+ if (broker->getQueues().destroyIfUntouched(shared_from_this(), expectedVersion)) {
if (broker->getAcl())
broker->getAcl()->recordDestroyQueue(name);
@@ -1577,14 +1572,15 @@ void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted && !c->hideDeletedError())
+ bool isDel = isDeleted();
+ if (isDel && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
- return !deleted;
+ return !isDel;
}
bool Queue::isDeleted() const
{
- Mutex::ScopedLock lock(messageLock);
+ Mutex::ScopedLock lock(deletionLock);
return deleted;
}
@@ -1703,7 +1699,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
Monitor::ScopedLock l(usageLock);
- if (parent.deleted) {
+ if (parent.isDeleted()) {
return false;
} else {
++count;
@@ -1719,8 +1715,8 @@ void Queue::UsageBarrier::release()
void Queue::UsageBarrier::destroy()
{
+ assert(parent.isDeleted());
Monitor::ScopedLock l(usageLock);
- parent.deleted = true;
while (count) usageLock.wait();
}
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Queue.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Queue.h b/src/qpid/broker/Queue.h
index 4b63a41..941af4d 100644
--- a/src/qpid/broker/Queue.h
+++ b/src/qpid/broker/Queue.h
@@ -215,7 +215,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
MessageInterceptors interceptors;
std::string seqNoKey;
Broker* broker;
- bool deleted;
+ bool deleted; // Set exactly once by QueueRegistry
+ mutable qpid::sys::Mutex deletionLock; // Short duration lock specific to "deleted".
UsageBarrier barrier;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/QueueRegistry.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/QueueRegistry.cpp b/src/qpid/broker/QueueRegistry.cpp
index 7310076..429d829 100644
--- a/src/qpid/broker/QueueRegistry.cpp
+++ b/src/qpid/broker/QueueRegistry.cpp
@@ -90,15 +90,19 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
}
void QueueRegistry::destroy(
- const string& name, const string& connectionId, const string& userId)
+ Queue::shared_ptr targetQ, const string& connectionId, const string& userId)
{
Queue::shared_ptr q;
{
qpid::sys::RWlock::ScopedWlock locker(lock);
- QueueMap::iterator i = queues.find(name);
- if (i != queues.end()) {
+ QueueMap::iterator i = queues.find(targetQ->name);
+ if (i != queues.end() && i->second == targetQ) {
q = i->second;
- eraseLH(i, q, name, connectionId, userId);
+ {
+ Mutex::ScopedLock delLocker(q->deletionLock);
+ q->deleted = true;
+ }
+ eraseLH(i, q, q->name, connectionId, userId);
}
}
// Destroy management object, store record etc. The Queue will not
@@ -125,17 +129,21 @@ void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const str
}
-bool QueueRegistry::destroyIfUntouched(const string& name, long version,
+bool QueueRegistry::destroyIfUntouched(Queue::shared_ptr targetQ, long version,
const string& connectionId, const string& userId)
{
Queue::shared_ptr q;
{
qpid::sys::RWlock::ScopedWlock locker(lock);
- QueueMap::iterator i = queues.find(name);
+ QueueMap::iterator i = queues.find(targetQ->name);
if (i != queues.end()) {
- if (i->second->version == version) {
+ if (i->second == targetQ && i->second->version == version) {
q = i->second;
- eraseLH(i, q, name, connectionId, userId);
+ {
+ Mutex::ScopedLock delLocker(q->deletionLock);
+ q->deleted = true;
+ }
+ eraseLH(i, q, q->name, connectionId, userId);
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/QueueRegistry.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/QueueRegistry.h b/src/qpid/broker/QueueRegistry.h
index 0ff96b6..e31f8ce 100644
--- a/src/qpid/broker/QueueRegistry.h
+++ b/src/qpid/broker/QueueRegistry.h
@@ -76,11 +76,11 @@ class QueueRegistry : private QueueFactory {
*
*/
QPID_BROKER_EXTERN void destroy(
- const std::string& name,
+ boost::shared_ptr<Queue> targetQ,
const std::string& connectionId=std::string(),
const std::string& userId=std::string());
- QPID_BROKER_EXTERN bool destroyIfUntouched(const std::string& name, long version,
+ QPID_BROKER_EXTERN bool destroyIfUntouched(boost::shared_ptr<Queue> targetQ, long version,
const std::string& connectionId=std::string(),
const std::string& userId=std::string());
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/SelfDestructQueue.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/SelfDestructQueue.cpp b/src/qpid/broker/SelfDestructQueue.cpp
index 55f644e..c4c3f01 100644
--- a/src/qpid/broker/SelfDestructQueue.cpp
+++ b/src/qpid/broker/SelfDestructQueue.cpp
@@ -33,7 +33,7 @@ SelfDestructQueue::SelfDestructQueue(const std::string& n, const QueueSettings&
bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&)
{
if (settings.maxDepth && (settings.maxDepth - current < increment)) {
- broker->getQueues().destroy(name);
+ broker->getQueues().destroy(shared_from_this());
if (broker->getAcl())
broker->getAcl()->recordDestroyQueue(name);
QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")");
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/amqp/Session.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/amqp/Session.cpp b/src/qpid/broker/amqp/Session.cpp
index e512073..1f25db2 100644
--- a/src/qpid/broker/amqp/Session.cpp
+++ b/src/qpid/broker/amqp/Session.cpp
@@ -627,7 +627,7 @@ void Session::detach(pn_link_t* link, bool closed)
i->second->detached(closed);
boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
if (q && !q->isAutoDelete() && !q->isDeleted()) {
- connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
+ connection.getBroker().deleteQueue(q, connection.getUserId(), connection.getMgmtId());
}
outgoing.erase(i);
QPID_LOG(debug, "Outgoing link detached");
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/tests/QueueRegistryTest.cpp
----------------------------------------------------------------------
diff --git a/src/tests/QueueRegistryTest.cpp b/src/tests/QueueRegistryTest.cpp
index 364d66c..a9045fd 100644
--- a/src/tests/QueueRegistryTest.cpp
+++ b/src/tests/QueueRegistryTest.cpp
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
std::pair<Queue::shared_ptr, bool> qc;
qc = reg.declare(foo, QueueSettings());
- reg.destroy(foo);
+ reg.destroy(qc.first);
// Queue is gone from the registry.
BOOST_CHECK(reg.find(foo) == 0);
// Queue is not actually destroyed till we drop our reference.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org