You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/21 22:23:42 UTC
svn commit: r629999 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker:
PersistableMessage.cpp PersistableMessage.h Queue.cpp Queue.h
Author: gsim
Date: Thu Feb 21 13:23:37 2008
New Revision: 629999
URL: http://svn.apache.org/viewvc?rev=629999&view=rev
Log:
Fixes to prevent problems with async store when queue is deleted before all messages are completed or dequeued
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Thu Feb 21 13:23:37 2008
@@ -38,7 +38,10 @@
}
}
for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
- store->flush(*(*i));
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q) {
+ store->flush(*q);
+ }
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Thu Feb 21 13:23:37 2008
@@ -25,6 +25,7 @@
#include <string>
#include <list>
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Monitor.h"
@@ -63,12 +64,12 @@
*/
int asyncDequeueCounter;
protected:
- typedef std::list<PersistableQueue*> syncList;
- syncList synclist;
- MessageStore* store;
- bool contentReleased;
-
- inline void setContentReleased() {contentReleased = true; }
+ typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
+ syncList synclist;
+ MessageStore* store;
+ bool contentReleased;
+
+ inline void setContentReleased() {contentReleased = true; }
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -118,18 +119,20 @@
sys::ScopedLock<sys::Mutex> l(storeLock);
if (store) {
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- (*i)->notifyDurableIOComplete();
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q) q->notifyDurableIOComplete();
}
//synclist.clear();
}
}
}
- inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) {
+ inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
- synclist.push_back(queue);
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
}
enqueueAsync();
}
@@ -161,11 +164,12 @@
}
}
- inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) {
+ inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
- synclist.push_back(queue);
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
}
dequeueAsync();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 21 13:23:37 2008
@@ -437,7 +437,7 @@
bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
- msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
@@ -450,7 +450,7 @@
bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
- msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+ msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
store->dequeue(ctxt, pmsg, *this);
return true;
@@ -498,7 +498,9 @@
}
if (store) {
+ store->flush(*this);
store->destroy(*this);
+ store = 0;//ensure we make no more calls to the store for this queue
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=629999&r1=629998&r2=629999&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 21 13:23:37 2008
@@ -26,6 +26,7 @@
#include <deque>
#include <set>
#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
#include "Consumer.h"
@@ -55,13 +56,13 @@
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue, public management::Manageable {
+ class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable {
typedef std::set<Consumer*> Listeners;
typedef std::deque<QueuedMessage> Messages;
const string name;
const bool autodelete;
- MessageStore* const store;
+ MessageStore* store;
const ConnectionToken* owner;
uint32_t consumerCount;
bool exclusive;