You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/21 22:23:42 UTC

svn commit: r629999 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: PersistableMessage.cpp PersistableMessage.h Queue.cpp Queue.h

Author: gsim
Date: Thu Feb 21 13:23:37 2008
New Revision: 629999

URL: http://svn.apache.org/viewvc?rev=629999&view=rev
Log:
Fixes to prevent problems with async store when queue is deleted before all messages are completed or dequeued


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Thu Feb 21 13:23:37 2008
@@ -38,7 +38,10 @@
 	}
     }
     for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
-        store->flush(*(*i));
+        PersistableQueue::shared_ptr q(i->lock());
+        if (q) {
+            store->flush(*q);
+        }
     } 
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Thu Feb 21 13:23:37 2008
@@ -25,6 +25,7 @@
 #include <string>
 #include <list>
 #include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
 #include "Persistable.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Monitor.h"
@@ -63,12 +64,12 @@
      */
     int asyncDequeueCounter;
 protected:
-    typedef std::list<PersistableQueue*> syncList;
-	syncList synclist;
-	MessageStore* store;
-	bool contentReleased;
-
-	inline void setContentReleased() {contentReleased = true; }
+    typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
+    syncList synclist;
+    MessageStore* store;
+    bool contentReleased;
+    
+    inline void setContentReleased() {contentReleased = true; }
 
 public:
     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -118,18 +119,20 @@
             sys::ScopedLock<sys::Mutex> l(storeLock);
             if (store) {
                 for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-                    (*i)->notifyDurableIOComplete();
+                    PersistableQueue::shared_ptr q(i->lock());
+                    if (q) q->notifyDurableIOComplete();
                 } 
                 //synclist.clear();
             }            
         }
     }
 
-    inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { 
+    inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
         if (_store){
             sys::ScopedLock<sys::Mutex> l(storeLock);
             store = _store;
-            synclist.push_back(queue);
+            boost::weak_ptr<PersistableQueue> q(queue);
+            synclist.push_back(q);
         }
         enqueueAsync();
     }
@@ -161,11 +164,12 @@
         }
     }
 
-    inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { 
+    inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
         if (_store){
             sys::ScopedLock<sys::Mutex> l(storeLock);
             store = _store;
-            synclist.push_back(queue);
+            boost::weak_ptr<PersistableQueue> q(queue);
+            synclist.push_back(q);
         }
         dequeueAsync();
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 21 13:23:37 2008
@@ -437,7 +437,7 @@
 bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
 {
     if (msg->isPersistent() && store) {
-        msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+        msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
@@ -450,7 +450,7 @@
 bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
 {
     if (msg->isPersistent() && store) {
-        msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+        msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
         store->dequeue(ctxt, pmsg, *this);
         return true;
@@ -498,7 +498,9 @@
     }
 
     if (store) {
+        store->flush(*this);
         store->destroy(*this);
+        store = 0;//ensure we make no more calls to the store for this queue
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 21 13:23:37 2008
@@ -26,6 +26,7 @@
 #include <deque>
 #include <set>
 #include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include "qpid/framing/amqp_types.h"
 #include "ConnectionToken.h"
 #include "Consumer.h"
@@ -55,13 +56,13 @@
          * registered consumers or be stored until dequeued or until one
          * or more consumers registers.
          */
-        class Queue : public PersistableQueue, public management::Manageable {
+        class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable {
             typedef std::set<Consumer*> Listeners;
             typedef std::deque<QueuedMessage> Messages;
 
             const string name;
             const bool autodelete;
-            MessageStore* const store;
+            MessageStore* store;
             const ConnectionToken* owner;
             uint32_t consumerCount;
             bool exclusive;