You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2018/10/17 23:01:00 UTC

qpid-cpp git commit: QPID-8209: race in queue deletion causes infinite recursion in autodelete

Repository: qpid-cpp
Updated Branches:
  refs/heads/master 0ba019b73 -> f20220f70


QPID-8209: race in queue deletion causes infinite recursion in autodelete


Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/f20220f7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/f20220f7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/f20220f7

Branch: refs/heads/master
Commit: f20220f70fcaf8c6b7efb6e24b35b033aaba5a32
Parents: 0ba019b
Author: Cliff Jansen <cl...@apache.org>
Authored: Wed Oct 17 15:55:49 2018 -0700
Committer: Cliff Jansen <cl...@apache.org>
Committed: Wed Oct 17 15:55:49 2018 -0700

----------------------------------------------------------------------
 src/qpid/broker/Broker.cpp            | 52 +++++++++++++++++-------------
 src/qpid/broker/Broker.h              |  6 ++++
 src/qpid/broker/Queue.cpp             | 24 ++++++--------
 src/qpid/broker/Queue.h               |  3 +-
 src/qpid/broker/QueueRegistry.cpp     | 24 +++++++++-----
 src/qpid/broker/QueueRegistry.h       |  4 +--
 src/qpid/broker/SelfDestructQueue.cpp |  2 +-
 src/qpid/broker/amqp/Session.cpp      |  2 +-
 src/tests/QueueRegistryTest.cpp       |  2 +-
 9 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Broker.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index 272219c..d214994 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -1473,36 +1473,44 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
     return result;
 }
 
-void Broker::deleteQueue(const std::string& name, const std::string& userId,
+void Broker::deleteQueue(boost::shared_ptr<Queue> queue, const std::string& userId,
                          const std::string& connectionId, QueueFunctor check)
 {
+    const std::string& name = queue->getName();
     QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
                  << " user:" << userId
                  << " rhost:" << connectionId
     );
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        boost::shared_ptr<Exchange> altEx = queue->getAlternateExchange();
+        params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
+        params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
+
+        if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+    }
+    if (check) check(queue);
+    if (acl)
+        acl->recordDestroyQueue(name);
+    Queue::shared_ptr peerQ(queue->getRedirectPeer());
+    if (peerQ)
+        queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
+                             queue->isRedirectSource() ? peerQ : queue,
+                             false);
+    queues.destroy(queue, connectionId, userId);
+
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+                         const std::string& connectionId, QueueFunctor check)
+{
     Queue::shared_ptr queue = queues.find(name);
     if (queue) {
-        if (acl) {
-            std::map<acl::Property, std::string> params;
-            boost::shared_ptr<Exchange> altEx = queue->getAlternateExchange();
-            params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
-            params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
-            params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
-            params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
-            params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
-
-            if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,&params) )
-                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
-        }
-        if (check) check(queue);
-        if (acl)
-            acl->recordDestroyQueue(name);
-        Queue::shared_ptr peerQ(queue->getRedirectPeer());
-        if (peerQ)
-            queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
-                                 queue->isRedirectSource() ? peerQ : queue,
-                                 false);
-        queues.destroy(name, connectionId, userId);
+        deleteQueue(queue, userId, connectionId, check);
     } else {
         throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
     }

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Broker.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index 3df28b0..6a7cc8b 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -279,6 +279,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
         const std::string& userId,
         const std::string& connectionId);
 
+    void deleteQueue(
+        boost::shared_ptr<Queue> queue,
+        const std::string& userId,
+        const std::string& connectionId,
+        QueueFunctor check = QueueFunctor());
+
     QPID_BROKER_EXTERN void deleteQueue(
         const std::string& name,
         const std::string& userId,

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Queue.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 858e874..3b54661 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -363,7 +363,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered)
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);
-        if (!deleted) {
+        if (!isDeleted()) {
             Message* message = messages->release(position);
             if (message) {
                 if (!markRedelivered) message->undeliver();
@@ -873,7 +873,7 @@ uint32_t Queue::getConsumerCount() const
 bool Queue::canAutoDelete() const
 {
     Mutex::ScopedLock locker(messageLock);
-    return !deleted && checkAutoDelete(locker);
+    return !isDeleted() && checkAutoDelete(locker);
 }
 
 bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const
@@ -1191,7 +1191,6 @@ void Queue::notifyDeleted()
     QueueListeners::ListenerSet set;
     {
         Mutex::ScopedLock locker(messageLock);
-        deleted = true;
         listeners.snapshot(set);
     }
     set.notifyAll();
@@ -1346,17 +1345,13 @@ void Queue::tryAutoDelete(long expectedVersion)
     bool proceed(false);
     {
         Mutex::ScopedLock locker(messageLock);
-        if (!deleted && checkAutoDelete(locker)) {
+        if (!isDeleted() && checkAutoDelete(locker)) {
             proceed = true;
         }
     }
 
     if (proceed) {
-        if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                deleted = true;
-            }
+        if (broker->getQueues().destroyIfUntouched(shared_from_this(), expectedVersion)) {
             if (broker->getAcl())
                 broker->getAcl()->recordDestroyQueue(name);
 
@@ -1577,14 +1572,15 @@ void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
 
 bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
 {
-    if (deleted && !c->hideDeletedError())
+    bool isDel = isDeleted();
+    if (isDel && !c->hideDeletedError())
         throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
-    return !deleted;
+    return !isDel;
 }
 
 bool Queue::isDeleted() const
 {
-    Mutex::ScopedLock lock(messageLock);
+    Mutex::ScopedLock lock(deletionLock);
     return deleted;
 }
 
@@ -1703,7 +1699,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 bool Queue::UsageBarrier::acquire()
 {
     Monitor::ScopedLock l(usageLock);
-    if (parent.deleted) {
+    if (parent.isDeleted()) {
         return false;
     } else {
         ++count;
@@ -1719,8 +1715,8 @@ void Queue::UsageBarrier::release()
 
 void Queue::UsageBarrier::destroy()
 {
+    assert(parent.isDeleted());
     Monitor::ScopedLock l(usageLock);
-    parent.deleted = true;
     while (count) usageLock.wait();
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/Queue.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Queue.h b/src/qpid/broker/Queue.h
index 4b63a41..941af4d 100644
--- a/src/qpid/broker/Queue.h
+++ b/src/qpid/broker/Queue.h
@@ -215,7 +215,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
     MessageInterceptors interceptors;
     std::string seqNoKey;
     Broker* broker;
-    bool deleted;
+    bool deleted;                           // Set exactly once by QueueRegistry
+    mutable qpid::sys::Mutex deletionLock;  // Short duration lock specific to "deleted".
     UsageBarrier barrier;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     boost::shared_ptr<MessageDistributor> allocator;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/QueueRegistry.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/QueueRegistry.cpp b/src/qpid/broker/QueueRegistry.cpp
index 7310076..429d829 100644
--- a/src/qpid/broker/QueueRegistry.cpp
+++ b/src/qpid/broker/QueueRegistry.cpp
@@ -90,15 +90,19 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
 }
 
 void QueueRegistry::destroy(
-    const string& name, const string& connectionId, const string& userId)
+    Queue::shared_ptr targetQ, const string& connectionId, const string& userId)
 {
     Queue::shared_ptr q;
     {
         qpid::sys::RWlock::ScopedWlock locker(lock);
-        QueueMap::iterator i = queues.find(name);
-        if (i != queues.end()) {
+        QueueMap::iterator i = queues.find(targetQ->name);
+        if (i != queues.end() && i->second == targetQ) {
             q = i->second;
-            eraseLH(i, q, name, connectionId, userId);
+            {
+                Mutex::ScopedLock delLocker(q->deletionLock);
+                q->deleted = true;
+            }
+            eraseLH(i, q, q->name, connectionId, userId);
         }
     }
     // Destroy management object, store record etc. The Queue will not
@@ -125,17 +129,21 @@ void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const str
 }
 
 
-bool QueueRegistry::destroyIfUntouched(const string& name, long version,
+bool QueueRegistry::destroyIfUntouched(Queue::shared_ptr targetQ, long version,
                                        const string& connectionId, const string& userId)
 {
     Queue::shared_ptr q;
     {
         qpid::sys::RWlock::ScopedWlock locker(lock);
-        QueueMap::iterator i = queues.find(name);
+        QueueMap::iterator i = queues.find(targetQ->name);
         if (i != queues.end()) {
-            if (i->second->version == version) {
+            if (i->second == targetQ && i->second->version == version) {
                 q = i->second;
-                eraseLH(i, q, name, connectionId, userId);
+                {
+                    Mutex::ScopedLock delLocker(q->deletionLock);
+                    q->deleted = true;
+                }
+                eraseLH(i, q, q->name, connectionId, userId);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/QueueRegistry.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/QueueRegistry.h b/src/qpid/broker/QueueRegistry.h
index 0ff96b6..e31f8ce 100644
--- a/src/qpid/broker/QueueRegistry.h
+++ b/src/qpid/broker/QueueRegistry.h
@@ -76,11 +76,11 @@ class QueueRegistry : private QueueFactory {
      *
      */
     QPID_BROKER_EXTERN void destroy(
-        const std::string& name,
+        boost::shared_ptr<Queue> targetQ,
         const std::string& connectionId=std::string(),
         const std::string& userId=std::string());
 
-    QPID_BROKER_EXTERN bool destroyIfUntouched(const std::string& name, long version,
+    QPID_BROKER_EXTERN bool destroyIfUntouched(boost::shared_ptr<Queue> targetQ, long version,
                                                const std::string& connectionId=std::string(),
                                                const std::string& userId=std::string());
 

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/SelfDestructQueue.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/SelfDestructQueue.cpp b/src/qpid/broker/SelfDestructQueue.cpp
index 55f644e..c4c3f01 100644
--- a/src/qpid/broker/SelfDestructQueue.cpp
+++ b/src/qpid/broker/SelfDestructQueue.cpp
@@ -33,7 +33,7 @@ SelfDestructQueue::SelfDestructQueue(const std::string& n, const QueueSettings&
 bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&)
 {
     if (settings.maxDepth && (settings.maxDepth - current < increment)) {
-        broker->getQueues().destroy(name);
+        broker->getQueues().destroy(shared_from_this());
         if (broker->getAcl())
             broker->getAcl()->recordDestroyQueue(name);
         QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")");

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/qpid/broker/amqp/Session.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/amqp/Session.cpp b/src/qpid/broker/amqp/Session.cpp
index e512073..1f25db2 100644
--- a/src/qpid/broker/amqp/Session.cpp
+++ b/src/qpid/broker/amqp/Session.cpp
@@ -627,7 +627,7 @@ void Session::detach(pn_link_t* link, bool closed)
             i->second->detached(closed);
             boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
             if (q && !q->isAutoDelete() && !q->isDeleted()) {
-                connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
+                connection.getBroker().deleteQueue(q, connection.getUserId(), connection.getMgmtId());
             }
             outgoing.erase(i);
             QPID_LOG(debug, "Outgoing link detached");

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/f20220f7/src/tests/QueueRegistryTest.cpp
----------------------------------------------------------------------
diff --git a/src/tests/QueueRegistryTest.cpp b/src/tests/QueueRegistryTest.cpp
index 364d66c..a9045fd 100644
--- a/src/tests/QueueRegistryTest.cpp
+++ b/src/tests/QueueRegistryTest.cpp
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
     std::pair<Queue::shared_ptr,  bool> qc;
 
     qc = reg.declare(foo, QueueSettings());
-    reg.destroy(foo);
+    reg.destroy(qc.first);
     // Queue is gone from the registry.
     BOOST_CHECK(reg.find(foo) == 0);
     // Queue is not actually destroyed till we drop our reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org