You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/06/15 08:47:59 UTC
svn commit: r1748523 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp
Queue.h QueueRegistry.cpp QueueRegistry.h
Author: gsim
Date: Wed Jun 15 08:47:59 2016
New Revision: 1748523
URL: http://svn.apache.org/viewvc?rev=1748523&view=rev
Log:
QPID-7302: Restart delayed auto-delete timer if the queue is declared
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1748523&r1=1748522&r2=1748523&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jun 15 08:47:59 2016
@@ -1297,9 +1297,10 @@ boost::shared_ptr<Exchange> Queue::getAl
struct AutoDeleteTask : qpid::sys::TimerTask
{
Queue::shared_ptr queue;
+ long expectedVersion;
AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {}
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q), expectedVersion(q->version) {}
void fire()
{
@@ -1307,7 +1308,7 @@ struct AutoDeleteTask : qpid::sys::Timer
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- queue->tryAutoDelete();
+ queue->tryAutoDelete(expectedVersion);
}
};
@@ -1320,29 +1321,37 @@ void Queue::scheduleAutoDelete(bool imme
broker->getTimer().add(autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated");
} else {
- tryAutoDelete();
+ tryAutoDelete(version);
}
}
}
-void Queue::tryAutoDelete()
+void Queue::tryAutoDelete(long expectedVersion)
{
bool proceed(false);
{
Mutex::ScopedLock locker(messageLock);
if (!deleted && checkAutoDelete(locker)) {
proceed = true;
- deleted = true;
}
}
if (proceed) {
- broker->getQueues().destroy(name);
- if (broker->getAcl())
- broker->getAcl()->recordDestroyQueue(name);
+ if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ deleted = true;
+ }
+ if (broker->getAcl())
+ broker->getAcl()->recordDestroyQueue(name);
- QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
- destroyed();
+ QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
+ destroyed();
+ } else {
+ //queue was accessed since the delayed auto-delete was scheduled, so try again
+ QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name);
+ scheduleAutoDelete();
+ }
} else {
QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1748523&r1=1748522&r2=1748523&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jun 15 08:47:59 2016
@@ -37,6 +37,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
@@ -219,6 +220,7 @@ class Queue : public boost::enable_share
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
boost::scoped_ptr<Selector> selector;
+ qpid::sys::AtomicCount version;
// Redirect source and target refer to each other. Only one is source.
Queue::shared_ptr redirectPeer;
@@ -271,7 +273,7 @@ class Queue : public boost::enable_share
uint32_t maxTests=0);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
- void tryAutoDelete();
+ void tryAutoDelete(long expectedVersion);
public:
typedef std::vector<shared_ptr> vector;
@@ -533,6 +535,7 @@ class Queue : public boost::enable_share
static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
friend class QueueFactory;
+ friend class QueueRegistry;
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1748523&r1=1748522&r2=1748523&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Jun 15 08:47:59 2016
@@ -74,6 +74,7 @@ QueueRegistry::declare(const string& nam
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
+ ++(i->second->version);
}
if (getBroker() && getBroker()->getManagementAgent()) {
getBroker()->getManagementAgent()->raiseEvent(
@@ -97,17 +98,41 @@ void QueueRegistry::destroy(
QueueMap::iterator i = queues.find(name);
if (i != queues.end()) {
q = i->second;
- queues.erase(i);
- if (getBroker()) {
- // NOTE: queueDestroy and raiseEvent must be called with the
- // lock held in order to ensure events are generated
- // in the correct order.
- getBroker()->getBrokerObservers().queueDestroy(q);
- if (getBroker()->getManagementAgent())
- getBroker()->getManagementAgent()->raiseEvent(
- _qmf::EventQueueDelete(connectionId, userId, name));
+ eraseLH(i, q, name, connectionId, userId);
+ }
+ }
+}
+
+void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId)
+{
+ queues.erase(i);
+ if (getBroker()) {
+ // NOTE: queueDestroy and raiseEvent must be called with the
+ // lock held in order to ensure events are generated
+ // in the correct order.
+ getBroker()->getBrokerObservers().queueDestroy(q);
+ if (getBroker()->getManagementAgent())
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDelete(connectionId, userId, name));
+ }
+}
+
+
+bool QueueRegistry::destroyIfUntouched(const string& name, long version,
+ const string& connectionId, const string& userId)
+{
+ Queue::shared_ptr q;
+ {
+ qpid::sys::RWlock::ScopedWlock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i != queues.end()) {
+ q = i->second;
+ if (q->version == version) {
+ eraseLH(i, q, name, connectionId, userId);
+ return true;
}
}
+ return false;
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1748523&r1=1748522&r2=1748523&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Wed Jun 15 08:47:59 2016
@@ -80,15 +80,9 @@ class QueueRegistry : private QueueFacto
const std::string& connectionId=std::string(),
const std::string& userId=std::string());
- template <class Test> bool destroyIf(const std::string& name, Test test)
- {
- if (test()) {
- destroy(name);
- return true;
- } else {
- return false;
- }
- }
+ QPID_BROKER_EXTERN bool destroyIfUntouched(const std::string& name, long version,
+ const std::string& connectionId=std::string(),
+ const std::string& userId=std::string());
/**
* Find the named queue. Return 0 if not found.
@@ -126,6 +120,8 @@ private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
mutable qpid::sys::RWlock lock;
+
+ void eraseLH(QueueMap::iterator, boost::shared_ptr<Queue>, const std::string& name, const std::string& connectionId, const std::string& userId);
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org