You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/11/04 23:52:11 UTC
svn commit: r1031325 - in /qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs:
MSSqlClfsProvider.cpp Messages.cpp Messages.h
Author: shuston
Date: Thu Nov 4 22:52:10 2010
New Revision: 1031325
URL: http://svn.apache.org/viewvc?rev=1031325&view=rev
Log:
Revised Messages to keep track of queues each message is on to enable finding all messages on a deleted queue. Removed this type of tracking from MSSqlClfsProvider.cpp.
Modified:
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Nov 4 22:52:10 2010
@@ -21,6 +21,7 @@
#include <list>
#include <map>
+#include <set>
#include <stdlib.h>
#include <string>
#include <windows.h>
@@ -68,21 +69,6 @@ const std::string TblConfig("tblConfig")
const std::string TblExchange("tblExchange");
const std::string TblQueue("tblQueue");
-/*
- * Maintain a map of id -> QueueContents. RWlock protecting the map allows
- * concurrent reads so multiple threads can get access to the needed queue;
- * queue lock protects the QueueContents themselves.
- */
-struct QueueContents {
- typedef boost::shared_ptr<QueueContents> shared_ptr;
- qpid::sys::Mutex lock;
- std::list<uint64_t> messages;
-};
-
-typedef std::map<uint64_t, QueueContents::shared_ptr> QueuesMap;
-qpid::sys::RWlock queuesLock;
-QueuesMap queues;
-
}
namespace qpid {
@@ -494,12 +480,6 @@ MSSqlClfsProvider::create(PersistableQue
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsQueues.add(queue);
- {
- // Db stuff ok so far; add an empty QueueContents for the queue.
- QueueContents::shared_ptr entry(new QueueContents);
- qpid::sys::ScopedWlock<qpid::sys::RWlock> l(queuesLock);
- queues[queue.getPersistenceId()] = entry;
- }
db->commitTransaction();
}
catch(_com_error &e) {
@@ -539,44 +519,13 @@ MSSqlClfsProvider::destroy(PersistableQu
}
/*
- * Now that the SQL stuff has recorded the queue deletion, reflect
- * all the dequeued messages in memory. Don't worry about any errors
- * that occur while reflecting these in the log because:
- * - If we have to recover from this point (or anywhere from here
- * until all messages are dequeued) there's no valid queue ID
- * from the Enqueue record, so recovery will throw it out anyway.
- * - If there is a failure before the SQL changes commit, the
- * existing Enqueue records will replace the message on the
- * queue during recovery.
- * so, the best we could do by logging these dequeue operations is
- * record something that will need to be ignored during recovery.
- *
- * Obtain a write lock to the queue map. Doing so gets this thread
- * exclusive access to the queue map. This means no other thread can
- * come while we're holding it and access, even for read, the list.
- * However, there may already be other previously obtained references
- * to the queue's message list outstanding, so also get the queue's
- * list lock to serialize with any other threads. We should be able
- * to count on the broker not making the destroy() call while other
- * uses of the queue are outstanding, but play it safe.
+ * Now that the SQL stuff has recorded the queue deletion, expunge
+ * all record of the queue from the messages set. Any errors logging
+ * these removals are swallowed because during a recovery the queue
+ * Id won't be present (the SQL stuff already committed) so any references
+ * to it in message operations will be removed.
*/
- std::list<uint64_t> affectedMessages;
- uint64_t qId = queue.getPersistenceId();
- {
- ::qpid::sys::RWlock::ScopedWlock l(queuesLock);
- QueueContents::shared_ptr q = queues[qId];
- {
- ::qpid::sys::Mutex::ScopedLock ql(q->lock);
- affectedMessages = q->messages;
- }
- queues.erase(queues.find(qId));
- }
- // Now tell each of the messages they are less one queue commitment.
- Transaction::shared_ptr nonTransactional;
- BOOST_FOREACH(uint64_t msgId, affectedMessages) {
- QPID_LOG(debug, "Removing message " << msgId);
- messages.dequeue(msgId, qId, nonTransactional);
- }
+ messages.expunge(queue.getPersistenceId());
}
/**
@@ -856,25 +805,12 @@ MSSqlClfsProvider::enqueue(qpid::broker:
if (tctx)
t = tctx->getTransaction();
}
- uint64_t qId = queue.getPersistenceId();
uint64_t msgId = msg->getPersistenceId();
- QueueContents::shared_ptr q;
- {
- qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
- QueuesMap::iterator i = queues.find(qId);
- if (i == queues.end())
- THROW_STORE_EXCEPTION("Queue does not exist");
- q = i->second;
- }
if (msgId == 0) {
messages.add(msg);
msgId = msg->getPersistenceId();
}
- messages.enqueue(msgId, qId, t);
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
- q->messages.push_back(msgId);
- }
+ messages.enqueue(msgId, queue.getPersistenceId(), t);
msg->enqueueComplete();
}
@@ -902,21 +838,7 @@ MSSqlClfsProvider::dequeue(qpid::broker:
if (tctx)
t = tctx->getTransaction();
}
- uint64_t qId = queue.getPersistenceId();
- uint64_t msgId = msg->getPersistenceId();
- QueueContents::shared_ptr q;
- {
- qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
- QueuesMap::const_iterator i = queues.find(qId);
- if (i == queues.end())
- THROW_STORE_EXCEPTION("Queue does not exist");
- q = i->second;
- }
- messages.dequeue(msgId, qId, t);
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
- q->messages.remove(msgId);
- }
+ messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t);
msg->dequeueComplete();
}
@@ -1063,8 +985,6 @@ MSSqlClfsProvider::recoverQueues(qpid::b
recoverer.recoverQueue(blob);
queue->setPersistenceId(id);
queueMap[id] = queue;
- QueueContents::shared_ptr entry(new QueueContents);
- queues[id] = entry;
p->MoveNext();
}
}
@@ -1085,9 +1005,28 @@ MSSqlClfsProvider::recoverMessages(qpid:
MessageMap& messageMap,
MessageQueueMap& messageQueueMap)
{
+ // Read the list of valid queue Ids to ensure that no broken msg->queue
+ // refs get restored.
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ std::set<uint64_t> validQueues;
+ if (!(p->BOF && p->EndOfFile)) {
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ validQueues.insert(id);
+ p->MoveNext();
+ }
+ }
std::map<uint64_t, Transaction::shared_ptr> transMap;
transactions->recover(transMap);
- messages.recover(recoverer, messageMap, messageQueueMap, transMap);
+ messages.recover(recoverer,
+ validQueues,
+ transMap,
+ messageMap,
+ messageQueueMap);
}
void
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Thu Nov 4 22:52:10 2010
@@ -24,6 +24,7 @@
#include "Messages.h"
#include "Lsn.h"
#include "qpid/store/StoreException.h"
+#include <boost/foreach.hpp>
namespace qpid {
namespace store {
@@ -67,37 +68,25 @@ Messages::enqueue(uint64_t msgId, uint64
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
- // If transacted, it still needs to be counted as enqueued to ensure it
- // is not deleted. Remember the transacted operation so it can be properly
- // resolved later.
- ::InterlockedIncrement(&p->enqueuedCount);
- uint64_t transactionId = 0;
- if (t.get() != 0)
- transactionId = t->getId();
- if (transactionId != 0) {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- p->transOps[t].push_back(MessageInfo::TRANSACTION_ENQUEUE);
- t->enroll(msgId);
- }
- try {
- log.recordEnqueue(msgId, queueId, transactionId);
- }
- catch (...) {
- // Undo the record-keeping if the log wasn't written correctly.
- ::InterlockedDecrement(&p->enqueuedCount);
- if (transactionId != 0) {
- t->unenroll(msgId);
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
- std::vector<MessageInfo::TransType>::iterator i;
- for (i = oplist.begin(); i < oplist.end(); ++i) {
- if (*i == MessageInfo::TRANSACTION_ENQUEUE) {
- oplist.erase(i);
- break;
- }
- }
+ MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ p->where.push_back(loc);
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordEnqueue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ p->where.pop_back();
+ throw;
}
- throw;
}
}
@@ -112,38 +101,46 @@ Messages::dequeue(uint64_t msgId, uint64
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
- // Remember the transacted operation so it can be properly resolved later.
- uint64_t transactionId = 0;
- if (t.get() != 0)
- transactionId = t->getId();
- if (transactionId != 0) {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- p->transOps[t].push_back(MessageInfo::TRANSACTION_DEQUEUE);
- t->enroll(msgId);
- }
- try {
- log.recordDequeue(msgId, queueId, transactionId);
- }
- catch(...) {
+ {
+ // Locate the 'where' entry for the specified queue. Once this operation
+ // is recorded in the log, update the 'where' entry to reflect it.
+ // Note that an existing entry in 'where' that refers to a transaction
+ // is not eligible for this operation.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->queueId == queueId && i->transaction.get() == 0)
+ break;
+ }
+ if (i == p->where.end())
+ THROW_STORE_EXCEPTION("Message not on queue");
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordDequeue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ throw;
+ }
+ // Ok, logged successfully. If this is a transactional op, note
+ // the transaction. If non-transactional, remove the 'where' entry.
if (transactionId != 0) {
- t->unenroll(msgId);
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
- std::vector<MessageInfo::TransType>::iterator i;
- for (i = oplist.begin(); i < oplist.end(); ++i) {
- if (*i == MessageInfo::TRANSACTION_DEQUEUE) {
- oplist.erase(i);
- break;
- }
- }
+ i->transaction = t;
+ i->disposition = MessageInfo::TRANSACTION_DEQUEUE;
+ }
+ else {
+ p->where.erase(i);
+ // If the message doesn't exist on any other queues, remove it.
+ if (p->where.empty())
+ remove(msgId);
}
- throw;
}
-
- // If transacted, leave the reference until the transaction commits.
- if (transactionId == 0)
- if (::InterlockedDecrement(&p->enqueuedCount) == 0)
- remove(msgId);
}
// Commit a previous provisional enqueue or dequeue of a particular message
@@ -161,22 +158,23 @@ Messages::commit(uint64_t msgId, Transac
p = i->second;
}
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
- std::vector<MessageInfo::TransType>::iterator i;
- for (i = oplist.begin(); i < oplist.end(); ++i) {
- // Transactional dequeues left the ref count alone until commit
- // while transaction enqueues already incremented it.
- if (*i == MessageInfo::TRANSACTION_DEQUEUE)
- ::InterlockedDecrement(&p->enqueuedCount);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->transaction != t)
+ continue;
+ // Transactional dequeues can now remove the item from the
+ // where list; enqueues just clear the transaction reference.
+ if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE)
+ i = p->where.erase(i);
+ else
+ i->transaction.reset();
}
- // Remember, last deref of Transaction::shared_ptr deletes Transaction.
- p->transOps.erase(t);
}
// If committing results in this message having no further enqueue
// references, delete it. If the delete fails, swallow the exception
// and let recovery take care of removing it later.
- if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+ if (p->where.empty()) {
try {
remove(msgId);
}
@@ -199,22 +197,28 @@ Messages::abort(uint64_t msgId, Transact
p = i->second;
}
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
- std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
- std::vector<MessageInfo::TransType>::iterator i;
- for (i = oplist.begin(); i < oplist.end(); ++i) {
- // Transactional enqueues incremented the ref count when seen;
- // while transaction dequeues left it alone.
- if (*i == MessageInfo::TRANSACTION_ENQUEUE)
- ::InterlockedDecrement(&p->enqueuedCount);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->transaction != t) {
+ ++i;
+ continue;
+ }
+ // Aborted transactional dequeues result in the message remaining
+ // enqueued like before the operation; enqueues clear the
+ // message from the where list - like the enqueue never happened.
+ if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE)
+ i = p->where.erase(i);
+ else {
+ i->transaction.reset();
+ ++i;
+ }
}
- // Remember, last deref of Transaction::shared_ptr deletes Transaction.
- p->transOps.erase(t);
}
- // If committing results in this message having no further enqueue
+ // If aborting results in this message having no further enqueue
// references, delete it. If the delete fails, swallow the exception
// and let recovery take care of removing it later.
- if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+ if (p->where.empty()) {
try {
remove(msgId);
}
@@ -237,9 +241,10 @@ Messages::loadContent(uint64_t msgId,
// the log.
void
Messages::recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
qpid::store::MessageMap& messageMap,
- qpid::store::MessageQueueMap& messageQueueMap,
- const std::map<uint64_t, Transaction::shared_ptr>& transMap)
+ qpid::store::MessageQueueMap& messageQueueMap)
{
std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
log.recover(recoverer, messageMap, messageOps);
@@ -264,9 +269,16 @@ Messages::recover(qpid::broker::Recovery
std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
for (op = ops.begin(); op != ops.end(); ++op) {
QueueEntry entry(op->queueId);
+ MessageInfo::Location loc(op->queueId);
std::string dir =
op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
: "dequeue";
+ if (validQueues.find(op->queueId) == validQueues.end()) {
+ QPID_LOG(info,
+ "Message " << msgId << dir << " on non-existant queue "
+ << op->queueId << "; dropped");
+ continue;
+ }
if (op->txnId != 0) {
// Be sure to enroll this message in the transaction even if
// it has committed or aborted. This ensures that the
@@ -276,22 +288,18 @@ Messages::recover(qpid::broker::Recovery
// it couldn't be recovered again.
//
// Recall what is being reconstructed; 2 things:
- // 1. This class's 'messages' list which only keeps track
- // of how many queues reference each message (though NOT
- // which queues) and the transactions each message is
- // enrolled in. For this, aborted transactions cause the
- // result of the operation to be ignored, but the
- // message does need to be enrolled in the transaction
- // to properly maintain the transaction references until
- // the message is deleted.
- // 2. The StorageProvider's MessageQueueMap, which DOES
- // have an entry for each queue each message is on and
+ // 1. This class's 'messages' list which keeps track
+ // of the queues each message is on and the transactions
+ // each message is enrolled in. For this, aborted
+ // transactions cause the result of the operation to be
+ // ignored, but the message does need to be enrolled in
+ // the transaction to properly maintain the transaction
+ // references until the message is deleted.
+ // 2. The StorageProvider's MessageQueueMap, which also
+ // has an entry for each queue each message is on and
// its TPL status and associated xid.
const Transaction::shared_ptr &t =
transMap.find(op->txnId)->second;
- // Adds t to map, ensuring a reference to Transaction, even if
- // no ops are added to the TransType vector.
- std::vector<MessageInfo::TransType>& tOps = m->transOps[t];
// Prepared transactions cause the operation to be
// provisionally acted on, and the message to be enrolled in
// the transaction for when it commits/aborts. This is
@@ -304,13 +312,14 @@ Messages::recover(qpid::broker::Recovery
THROW_STORE_EXCEPTION("Invalid transaction state");
t->enroll(msgId);
entry.xid = tpct->getXid();
+ loc.transaction = t;
if (op->op == MessageLog::RECOVERED_ENQUEUE) {
- tOps.push_back(MessageInfo::TRANSACTION_ENQUEUE);
entry.tplStatus = QueueEntry::ADDING;
+ loc.disposition = MessageInfo::TRANSACTION_ENQUEUE;
}
else {
- tOps.push_back(MessageInfo::TRANSACTION_DEQUEUE);
entry.tplStatus = QueueEntry::REMOVING;
+ loc.disposition = MessageInfo::TRANSACTION_DEQUEUE;
}
}
else if (t->getState() != Transaction::TRANS_COMMITTED) {
@@ -329,42 +338,107 @@ Messages::recover(qpid::broker::Recovery
// it if the current op is non-transactional; if it's a prepared
// transaction then replace the existing entry with the current
// one that notes the message is enqueued but being removed under
- // a prepared transaciton.
+ // a prepared transaction.
QPID_LOG(debug, dir + " at queue " << entry.queueId);
if (op->op == MessageLog::RECOVERED_ENQUEUE) {
entries.push_back(entry);
+ m->where.push_back(loc);
}
else {
std::vector<QueueEntry>::iterator i = entries.begin();
while (i != entries.end()) {
if (i->queueId == entry.queueId) {
- *i = entry;
+ if (entry.tplStatus != QueueEntry::NONE)
+ *i = entry;
+ else
+ entries.erase(i);
break;
}
++i;
}
+ std::list<MessageInfo::Location>::iterator w = m->where.begin();
+ while (w != m->where.end()) {
+ if (w->queueId == loc.queueId) {
+ if (loc.transaction.get() != 0)
+ *w = loc;
+ else
+ m->where.erase(w);
+ }
+ }
}
}
- // Now that all the queue entries have been set correctly, the
- // enqueuedCount that MessageInfo keeps track of is simply the
- // number of queue map entries. If there are none, add this
- // message to the homeless list to be deleted from the log after
- // the recovery is done.
- if ((m->enqueuedCount = entries.size()) == 0) {
+ // Now that all the queue entries have been set correctly, see if
+ // there are any entries; they may have all been removed during
+ // recovery. If there are none, add this message to the homeless
+ // list to be deleted from the log after the recovery is done.
+ if (m->where.size() == 0) {
homeless.push_back(msgId);
messageMap.erase(msgId);
messageQueueMap.erase(msgId);
}
- std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
- messages.insert(p);
+ else {
+ std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
+ messages.insert(p);
+ }
}
QPID_LOG(debug, "Message log recovery done.");
// Done! Ok, go back and delete all the homeless messages.
- for (std::vector<uint64_t>::iterator i = homeless.begin();
- i != homeless.end();
- ++i) {
- QPID_LOG(debug, "Deleting homeless message " << *i);
- remove(*i);
+ BOOST_FOREACH(uint64_t msg, homeless) {
+ QPID_LOG(debug, "Deleting homeless message " << msg);
+ remove(msg);
+ }
+}
+
+// Expunge is called when a queue is deleted. All references to that
+// queue must be expunged from all messages. 'Dequeue' log records are
+// written for each queue entry removed, but any errors are swallowed.
+// On recovery there's a list of valid queues passed in. The deleted
+// queue will not be on that list so if any references to it are
+// recovered they'll get weeded out then.
+void
+Messages::expunge(uint64_t queueId)
+{
+ std::vector<uint64_t> toBeDeleted; // Messages to be deleted later.
+
+ {
+ // Lock everybody out since all messages are possibly in play.
+ // There also may be other threads already working on particular
+ // messages so individual message mutex still must be acquired.
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ MessageMap::iterator m;
+ for (m = messages.begin(); m != messages.end(); ++m) {
+ MessageInfo::shared_ptr p = m->second;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->queueId != queueId) {
+ ++i;
+ continue;
+ }
+ // If this entry is involved in a transaction, unenroll it.
+ // Then remove the entry.
+ if (i->transaction.get() != 0)
+ i->transaction->unenroll(m->first);
+ i = p->where.erase(i);
+ try {
+ log.recordDequeue(m->first, queueId, 0);
+ }
+ catch(...) {
+ }
+ }
+ if (p->where.size() == 0)
+ toBeDeleted.push_back(m->first);
+ }
+ }
+ }
+ // Swallow any exceptions during this; don't care. Recover it later
+ // if needed.
+ try {
+ BOOST_FOREACH(uint64_t msg, toBeDeleted)
+ remove(msg);
+ }
+ catch(...) {
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Thu Nov 4 22:52:10 2010
@@ -24,6 +24,7 @@
#include <windows.h>
#include <map>
+#include <set>
#include <vector>
#include <boost/intrusive_ptr.hpp>
#include <qpid/broker/PersistableMessage.h>
@@ -46,9 +47,38 @@ class Messages {
// Keep a list of transactional operations this message is
// referenced in. When the transaction changes/finalizes these all
// need to be acted on.
- typedef enum { TRANSACTION_ENQUEUE, TRANSACTION_DEQUEUE } TransType;
+ typedef enum { TRANSACTION_NONE = 0,
+ TRANSACTION_ENQUEUE,
+ TRANSACTION_DEQUEUE } TransType;
+#if 0
std::map<Transaction::shared_ptr, std::vector<TransType> > transOps;
qpid::sys::Mutex transOpsLock;
+#endif
+ // Think what I need is a list of "where is this message" - queue id,
+ // transaction ref, what kind of trans op (enq/deq). Then "remove all
+ // queue refs" can search through all messages looking for queue ids
+ // and undo them. Write "remove from queue" record to log. Also need to
+ // add "remove from queue" to recovery.
+ struct Location {
+ uint64_t queueId;
+ Transaction::shared_ptr transaction;
+ TransType disposition;
+
+ Location(uint64_t q)
+ : queueId(q), transaction(), disposition(TRANSACTION_NONE) {}
+ Location(uint64_t q, Transaction::shared_ptr& t, TransType d)
+ : queueId(q), transaction(t), disposition(d) {}
+ };
+ qpid::sys::Mutex whereLock;
+ std::list<Location> where;
+ // The transactions vector just keeps a shared_ptr to each
+ // Transaction this message was involved in, regardless of the
+ // disposition or transaction state. Keeping a valid shared_ptr
+ // prevents the Transaction from being deleted. As long as there
+ // are any messages that referred to a transaction, that
+ // transaction's state needs to be known so the message disposition
+ // can be correctly recovered if needed.
+ std::vector<Transaction::shared_ptr> transactions;
typedef boost::shared_ptr<MessageInfo> shared_ptr;
@@ -96,12 +126,17 @@ public:
uint64_t offset,
uint32_t length);
+ // Expunge is called when a queue is deleted. All references to that
+ // queue must be expunged from all messages.
+ void expunge(uint64_t queueId);
+
// Recover the current set of messages and where they're queued from
// the log.
void recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
qpid::store::MessageMap& messageMap,
- qpid::store::MessageQueueMap& messageQueueMap,
- const std::map<uint64_t, Transaction::shared_ptr>& transMap);
+ qpid::store::MessageQueueMap& messageQueueMap);
};
}}} // namespace qpid::store::ms_clfs
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org