You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/03 11:04:01 UTC

svn commit: r552751 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/

Author: gsim
Date: Tue Jul  3 02:04:00 2007
New Revision: 552751

URL: http://svn.apache.org/viewvc?view=rev&rev=552751
Log:
Autodeletable shared queues are now deleted as soon as the consumer count drops to zero (i.e. there is no timeout).
This closes QPID-533.


Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul  3 02:04:00 2007
@@ -180,7 +180,6 @@
 libqpidbroker_la_LIBADD = libqpidcommon.la -ldaemon -lboost_filesystem
 libqpidbroker_la_SOURCES = \
   qpid/broker/AccumulatedAck.cpp \
-  qpid/broker/AutoDelete.cpp \
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerAdapter.cpp \
   qpid/broker/BrokerSingleton.cpp \
@@ -241,7 +240,6 @@
 nobase_include_HEADERS = \
   $(platform_hdr) \
   qpid/broker/AccumulatedAck.h \
-  qpid/broker/AutoDelete.h \
   qpid/broker/BrokerChannel.h \
   qpid/broker/BrokerExchange.h \
   qpid/broker/BrokerMessage.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jul  3 02:04:00 2007
@@ -80,9 +80,7 @@
     config(conf),
     store(createStore(conf)),
     queues(store.get()),
-    timeout(30000),
     stagingThreshold(0),
-    cleaner(&queues, 100),  // clean every 100 auto delete declares.
     factory(*this),
     dtxManager(store.get())
 {
@@ -127,7 +125,6 @@
 void Broker::shutdown() {
     if (acceptor)
         acceptor->shutdown();
-//cct    cleaner.cleanNow();  // do we need to delete on close?
 }
 
 Broker::~Broker() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jul  3 02:04:00 2007
@@ -28,7 +28,6 @@
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Acceptor.h"
 #include "MessageStore.h"
-#include "AutoDelete.h"
 #include "ExchangeRegistry.h"
 #include "ConnectionToken.h"
 #include "DirectExchange.h"
@@ -95,9 +94,7 @@
     MessageStore& getStore() { return *store; }
     QueueRegistry& getQueues() { return queues; }
     ExchangeRegistry& getExchanges() { return exchanges; }
-    uint32_t getTimeout() { return timeout; }
     uint64_t getStagingThreshold() { return stagingThreshold; }
-    AutoDelete& getCleaner() { return cleaner; }
     DtxManager& getDtxManager() { return dtxManager; }
     
   private:
@@ -108,9 +105,7 @@
     const std::auto_ptr<MessageStore> store;
     QueueRegistry queues;
     ExchangeRegistry exchanges;
-    uint32_t timeout;
     uint64_t stagingThreshold;
-    AutoDelete cleaner;
     ConnectionFactory factory;
     DtxManager dtxManager;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Jul  3 02:04:00 2007
@@ -213,7 +213,7 @@
 	std::pair<Queue::shared_ptr, bool> queue_created =  
             broker.getQueues().declare(
                 name, durable,
-                autoDelete ? connection.getTimeout() : 0,
+                autoDelete && !exclusive,
                 exclusive ? &connection : 0);
 	queue = queue_created.first;
 	assert(queue);
@@ -229,9 +229,6 @@
             //handle automatic cleanup:
 	    if (exclusive) {
 		connection.exclusiveQueues.push_back(queue);
-	    } else if(autoDelete){
-		broker.getCleaner().add(queue);
-		broker.getCleaner().clean(); // check if cleaning is needed
 	    }
 	}
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jul  3 02:04:00 2007
@@ -249,8 +249,13 @@
 }
 
 void Channel::ConsumerImpl::cancel(){
-    if(queue)
+    if(queue) {
         queue->cancel(this);
+        if (queue->canAutoDelete()) {            
+            parent->connection.broker.getQueues().destroyIf(queue->getName(), 
+                                                            boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+        }
+    }
 }
 
 void Channel::ConsumerImpl::requestDispatch(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul  3 02:04:00 2007
@@ -36,7 +36,7 @@
 using namespace qpid::framing;
 using boost::format;
 
-Queue::Queue(const string& _name, uint32_t _autodelete, 
+Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
              const ConnectionToken* const _owner) :
 
@@ -50,7 +50,6 @@
     exclusive(0),
     persistenceId(0)
 {
-    if(autodelete) lastUsed = now();
 }
 
 Queue::~Queue(){}
@@ -134,7 +133,6 @@
                             "Exclusive access denied.") %getName());
         exclusive = c;
     }
-    if(autodelete && consumers.empty()) lastUsed = FAR_FUTURE;
     consumers.push_back(c);
 }
 
@@ -143,7 +141,6 @@
     Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
     if (i != consumers.end()) 
         consumers.erase(i);
-    if(autodelete && consumers.empty()) lastUsed = now();
     if(exclusive == c) exclusive = 0;
 }
 
@@ -192,7 +189,7 @@
 
 bool Queue::canAutoDelete() const{
     Mutex::ScopedLock locker(lock);
-    return Duration(lastUsed, now()) > autodelete;
+    return autodelete && consumers.size() == 0;
 }
 
 void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul  3 02:04:00 2007
@@ -60,7 +60,7 @@
             typedef std::queue<Message::shared_ptr> Messages;
             
             const string name;
-            const sys::Duration autodelete;
+            const bool autodelete;
             MessageStore* const store;
             const ConnectionToken* const owner;
             Consumers consumers;
@@ -69,7 +69,6 @@
             bool dispatching;
             int next;
             mutable qpid::sys::Mutex lock;
-            sys::AbsTime lastUsed;
             Consumer* exclusive;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
@@ -89,7 +88,7 @@
 
             typedef std::vector<shared_ptr> vector;
 	    
-            Queue(const string& name, uint32_t autodelete = 0, 
+            Queue(const string& name, bool autodelete = false, 
                   MessageStore* const store = 0, 
                   const ConnectionToken* const owner = 0);
             ~Queue();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jul  3 02:04:00 2007
@@ -41,7 +41,6 @@
     framemax(65536), 
     heartbeat(0),
     client(0),
-    timeout(broker.getTimeout()),
     stagingThreshold(broker.getStagingThreshold())
 {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jul  3 02:04:00 2007
@@ -62,7 +62,6 @@
 
     uint32_t getFrameMax() const { return framemax; }
     uint16_t getHeartbeat() const { return heartbeat; }
-    uint32_t getTimeout() const { return timeout; }
     uint64_t getStagingThreshold() const { return stagingThreshold; }
 
     void setFrameMax(uint32_t fm) { framemax = fm; }
@@ -98,7 +97,6 @@
     uint32_t framemax;
     uint16_t heartbeat;
     framing::AMQP_ClientProxy::Connection* client;
-    const uint32_t timeout; //timeout for auto-deleted queues (in ms)
     const uint64_t stagingThreshold;
 
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Jul  3 02:04:00 2007
@@ -31,7 +31,7 @@
 
 std::pair<Queue::shared_ptr, bool>
 QueueRegistry::declare(const string& declareName, bool durable, 
-                       uint32_t autoDelete, const ConnectionToken* owner)
+                       bool autoDelete, const ConnectionToken* owner)
 {
     Mutex::ScopedLock locker(lock);
     string name = declareName.empty() ? generateName() : declareName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=552751&r1=552750&r2=552751
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Jul  3 02:04:00 2007
@@ -22,7 +22,7 @@
 #define _QueueRegistry_
 
 #include <map>
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
 #include "BrokerQueue.h"
 
 namespace qpid {
@@ -36,7 +36,6 @@
  *
  */
 class QueueRegistry{
- 
   public:
     QueueRegistry(MessageStore* const store = 0);
     ~QueueRegistry();
@@ -47,7 +46,7 @@
      * @return The queue and a boolean flag which is true if the queue
      * was created by this declare call false if it already existed.
      */
-    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0, 
+    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false, 
                                                const ConnectionToken* const owner = 0);
 
     /**
@@ -63,6 +62,13 @@
      *
      */
     void destroy(const string& name);
+    template <class Test> void destroyIf(const string& name, Test test)
+    {
+        qpid::sys::Mutex::ScopedLock locker(lock);
+        if (test()) {
+            queues.erase(name);
+        }
+    }
 
     /**
      * Find the named queue. Return 0 if not found.
@@ -79,8 +85,7 @@
      */
     MessageStore* const getStore() const;
 
-
-  private:
+private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;
     qpid::sys::Mutex lock;