You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/03 11:04:01 UTC
svn commit: r552751 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
Author: gsim
Date: Tue Jul 3 02:04:00 2007
New Revision: 552751
URL: http://svn.apache.org/viewvc?view=rev&rev=552751
Log:
Autodeletable shared queues are now deleted as soon as the consumer count drops to zero (i.e. there is no timeout).
This closes QPID-533.
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul 3 02:04:00 2007
@@ -180,7 +180,6 @@
libqpidbroker_la_LIBADD = libqpidcommon.la -ldaemon -lboost_filesystem
libqpidbroker_la_SOURCES = \
qpid/broker/AccumulatedAck.cpp \
- qpid/broker/AutoDelete.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
@@ -241,7 +240,6 @@
nobase_include_HEADERS = \
$(platform_hdr) \
qpid/broker/AccumulatedAck.h \
- qpid/broker/AutoDelete.h \
qpid/broker/BrokerChannel.h \
qpid/broker/BrokerExchange.h \
qpid/broker/BrokerMessage.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jul 3 02:04:00 2007
@@ -80,9 +80,7 @@
config(conf),
store(createStore(conf)),
queues(store.get()),
- timeout(30000),
stagingThreshold(0),
- cleaner(&queues, 100), // clean every 100 auto delete declares.
factory(*this),
dtxManager(store.get())
{
@@ -127,7 +125,6 @@
void Broker::shutdown() {
if (acceptor)
acceptor->shutdown();
-//cct cleaner.cleanNow(); // do we need to delete on close?
}
Broker::~Broker() {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jul 3 02:04:00 2007
@@ -28,7 +28,6 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Acceptor.h"
#include "MessageStore.h"
-#include "AutoDelete.h"
#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
@@ -95,9 +94,7 @@
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
- uint32_t getTimeout() { return timeout; }
uint64_t getStagingThreshold() { return stagingThreshold; }
- AutoDelete& getCleaner() { return cleaner; }
DtxManager& getDtxManager() { return dtxManager; }
private:
@@ -108,9 +105,7 @@
const std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
- uint32_t timeout;
uint64_t stagingThreshold;
- AutoDelete cleaner;
ConnectionFactory factory;
DtxManager dtxManager;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Jul 3 02:04:00 2007
@@ -213,7 +213,7 @@
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
name, durable,
- autoDelete ? connection.getTimeout() : 0,
+ autoDelete && !exclusive,
exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
@@ -229,9 +229,6 @@
//handle automatic cleanup:
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- broker.getCleaner().add(queue);
- broker.getCleaner().clean(); // check if cleaning is needed
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jul 3 02:04:00 2007
@@ -249,8 +249,13 @@
}
void Channel::ConsumerImpl::cancel(){
- if(queue)
+ if(queue) {
queue->cancel(this);
+ if (queue->canAutoDelete()) {
+ parent->connection.broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ }
+ }
}
void Channel::ConsumerImpl::requestDispatch(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul 3 02:04:00 2007
@@ -36,7 +36,7 @@
using namespace qpid::framing;
using boost::format;
-Queue::Queue(const string& _name, uint32_t _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const ConnectionToken* const _owner) :
@@ -50,7 +50,6 @@
exclusive(0),
persistenceId(0)
{
- if(autodelete) lastUsed = now();
}
Queue::~Queue(){}
@@ -134,7 +133,6 @@
"Exclusive access denied.") %getName());
exclusive = c;
}
- if(autodelete && consumers.empty()) lastUsed = FAR_FUTURE;
consumers.push_back(c);
}
@@ -143,7 +141,6 @@
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
consumers.erase(i);
- if(autodelete && consumers.empty()) lastUsed = now();
if(exclusive == c) exclusive = 0;
}
@@ -192,7 +189,7 @@
bool Queue::canAutoDelete() const{
Mutex::ScopedLock locker(lock);
- return Duration(lastUsed, now()) > autodelete;
+ return autodelete && consumers.size() == 0;
}
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul 3 02:04:00 2007
@@ -60,7 +60,7 @@
typedef std::queue<Message::shared_ptr> Messages;
const string name;
- const sys::Duration autodelete;
+ const bool autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
Consumers consumers;
@@ -69,7 +69,6 @@
bool dispatching;
int next;
mutable qpid::sys::Mutex lock;
- sys::AbsTime lastUsed;
Consumer* exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
@@ -89,7 +88,7 @@
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, uint32_t autodelete = 0,
+ Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
const ConnectionToken* const owner = 0);
~Queue();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jul 3 02:04:00 2007
@@ -41,7 +41,6 @@
framemax(65536),
heartbeat(0),
client(0),
- timeout(broker.getTimeout()),
stagingThreshold(broker.getStagingThreshold())
{}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jul 3 02:04:00 2007
@@ -62,7 +62,6 @@
uint32_t getFrameMax() const { return framemax; }
uint16_t getHeartbeat() const { return heartbeat; }
- uint32_t getTimeout() const { return timeout; }
uint64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(uint32_t fm) { framemax = fm; }
@@ -98,7 +97,6 @@
uint32_t framemax;
uint16_t heartbeat;
framing::AMQP_ClientProxy::Connection* client;
- const uint32_t timeout; //timeout for auto-deleted queues (in ms)
const uint64_t stagingThreshold;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Jul 3 02:04:00 2007
@@ -31,7 +31,7 @@
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
- uint32_t autoDelete, const ConnectionToken* owner)
+ bool autoDelete, const ConnectionToken* owner)
{
Mutex::ScopedLock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Jul 3 02:04:00 2007
@@ -22,7 +22,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "BrokerQueue.h"
namespace qpid {
@@ -36,7 +36,6 @@
*
*/
class QueueRegistry{
-
public:
QueueRegistry(MessageStore* const store = 0);
~QueueRegistry();
@@ -47,7 +46,7 @@
* @return The queue and a boolean flag which is true if the queue
* was created by this declare call false if it already existed.
*/
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0,
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false,
const ConnectionToken* const owner = 0);
/**
@@ -63,6 +62,13 @@
*
*/
void destroy(const string& name);
+ template <class Test> void destroyIf(const string& name, Test test)
+ {
+ qpid::sys::Mutex::ScopedLock locker(lock);
+ if (test()) {
+ queues.erase(name);
+ }
+ }
/**
* Find the named queue. Return 0 if not found.
@@ -79,8 +85,7 @@
*/
MessageStore* const getStore() const;
-
- private:
+private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
qpid::sys::Mutex lock;