You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/11/04 23:52:11 UTC

svn commit: r1031325 - in /qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs: MSSqlClfsProvider.cpp Messages.cpp Messages.h

Author: shuston
Date: Thu Nov  4 22:52:10 2010
New Revision: 1031325

URL: http://svn.apache.org/viewvc?rev=1031325&view=rev
Log:
Revised Messages to keep track of queues each message is on to enable finding all messages on a deleted queue. Removed this type of tracking from MSSqlClfsProvider.cpp.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Nov  4 22:52:10 2010
@@ -21,6 +21,7 @@
 
 #include <list>
 #include <map>
+#include <set>
 #include <stdlib.h>
 #include <string>
 #include <windows.h>
@@ -68,21 +69,6 @@ const std::string TblConfig("tblConfig")
 const std::string TblExchange("tblExchange");
 const std::string TblQueue("tblQueue");
 
-/*
- * Maintain a map of id -> QueueContents. RWlock protecting the map allows
- * concurrent reads so multiple threads can get access to the needed queue;
- * queue lock protects the QueueContents themselves.
- */
-struct QueueContents {
-    typedef boost::shared_ptr<QueueContents> shared_ptr;
-    qpid::sys::Mutex lock;
-    std::list<uint64_t> messages;
-};
-
-typedef std::map<uint64_t, QueueContents::shared_ptr> QueuesMap;
-qpid::sys::RWlock queuesLock;
-QueuesMap queues;
-
 }
 
 namespace qpid {
@@ -494,12 +480,6 @@ MSSqlClfsProvider::create(PersistableQue
         db->beginTransaction();
         rsQueues.open(db, TblQueue);
         rsQueues.add(queue);
-        {
-            // Db stuff ok so far; add an empty QueueContents for the queue.
-            QueueContents::shared_ptr entry(new QueueContents);
-            qpid::sys::ScopedWlock<qpid::sys::RWlock> l(queuesLock);
-            queues[queue.getPersistenceId()] = entry;
-        }
         db->commitTransaction();
     }
     catch(_com_error &e) {
@@ -539,44 +519,13 @@ MSSqlClfsProvider::destroy(PersistableQu
     }
 
     /*
-     * Now that the SQL stuff has recorded the queue deletion, reflect
-     * all the dequeued messages in memory. Don't worry about any errors
-     * that occur while reflecting these in the log because:
-     *   - If we have to recover from this point (or anywhere from here
-     *     until all messages are dequeued) there's no valid queue ID
-     *     from the Enqueue record, so recovery will throw it out anyway.
-     *   - If there is a failure before the SQL changes commit, the
-     *     existing Enqueue records will replace the message on the
-     *     queue during recovery.
-     * so, the best we could do by logging these dequeue operations is
-     * record something that will need to be ignored during recovery.
-     *
-     * Obtain a write lock to the queue map. Doing so gets this thread
-     * exclusive access to the queue map. This means no other thread can
-     * come while we're holding it and access, even for read, the list.
-     * However, there may already be other previously obtained references
-     * to the queue's message list outstanding, so also get the queue's
-     * list lock to serialize with any other threads. We should be able
-     * to count on the broker not making the destroy() call while other
-     * uses of the queue are outstanding, but play it safe.
+     * Now that the SQL stuff has recorded the queue deletion, expunge
+     * all record of the queue from the messages set. Any errors logging
+     * these removals are swallowed because during a recovery the queue
+     * Id won't be present (the SQL stuff already committed) so any references
+     * to it in message operations will be removed.
      */
-    std::list<uint64_t> affectedMessages;
-    uint64_t qId = queue.getPersistenceId();
-    {
-        ::qpid::sys::RWlock::ScopedWlock l(queuesLock);
-        QueueContents::shared_ptr q = queues[qId];
-        {
-            ::qpid::sys::Mutex::ScopedLock ql(q->lock);
-            affectedMessages = q->messages;
-        }
-        queues.erase(queues.find(qId));
-    }
-    // Now tell each of the messages they are less one queue commitment.
-    Transaction::shared_ptr nonTransactional;
-    BOOST_FOREACH(uint64_t msgId, affectedMessages) {
-        QPID_LOG(debug, "Removing message " << msgId);
-        messages.dequeue(msgId, qId, nonTransactional);
-    }
+    messages.expunge(queue.getPersistenceId());
 }
 
 /**
@@ -856,25 +805,12 @@ MSSqlClfsProvider::enqueue(qpid::broker:
         if (tctx)
             t = tctx->getTransaction();
     }
-    uint64_t qId = queue.getPersistenceId();
     uint64_t msgId = msg->getPersistenceId();
-    QueueContents::shared_ptr q;
-    {
-        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
-        QueuesMap::iterator i = queues.find(qId);
-        if (i == queues.end())
-            THROW_STORE_EXCEPTION("Queue does not exist");
-        q = i->second;
-    }
     if (msgId == 0) {
         messages.add(msg);
         msgId = msg->getPersistenceId();
     }
-    messages.enqueue(msgId, qId, t);
-    {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
-        q->messages.push_back(msgId);
-    }
+    messages.enqueue(msgId, queue.getPersistenceId(), t);
     msg->enqueueComplete();
 }
 
@@ -902,21 +838,7 @@ MSSqlClfsProvider::dequeue(qpid::broker:
         if (tctx)
             t = tctx->getTransaction();
     }
-    uint64_t qId = queue.getPersistenceId();
-    uint64_t msgId = msg->getPersistenceId();
-    QueueContents::shared_ptr q;
-    {
-        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
-        QueuesMap::const_iterator i = queues.find(qId);
-        if (i == queues.end())
-            THROW_STORE_EXCEPTION("Queue does not exist");
-        q = i->second;
-    }
-    messages.dequeue(msgId, qId, t);
-    {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
-        q->messages.remove(msgId);
-    }
+    messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t);
     msg->dequeueComplete();
 }
 
@@ -1063,8 +985,6 @@ MSSqlClfsProvider::recoverQueues(qpid::b
             recoverer.recoverQueue(blob);
         queue->setPersistenceId(id);
         queueMap[id] = queue;
-        QueueContents::shared_ptr entry(new QueueContents);
-        queues[id] = entry;
         p->MoveNext();
     }
 }
@@ -1085,9 +1005,28 @@ MSSqlClfsProvider::recoverMessages(qpid:
                                    MessageMap& messageMap,
                                    MessageQueueMap& messageQueueMap)
 {
+    // Read the list of valid queue Ids to ensure that no broken msg->queue
+    // refs get restored.
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    rsQueues.open(db, TblQueue);
+    _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+    std::set<uint64_t> validQueues;
+    if (!(p->BOF && p->EndOfFile)) {
+        p->MoveFirst();
+        while (!p->EndOfFile) {
+            uint64_t id = p->Fields->Item["persistenceId"]->Value;
+            validQueues.insert(id);
+            p->MoveNext();
+        }
+    }
     std::map<uint64_t, Transaction::shared_ptr> transMap;
     transactions->recover(transMap);
-    messages.recover(recoverer, messageMap, messageQueueMap, transMap);
+    messages.recover(recoverer,
+                     validQueues,
+                     transMap,
+                     messageMap,
+                     messageQueueMap);
 }
 
 void

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Thu Nov  4 22:52:10 2010
@@ -24,6 +24,7 @@
 #include "Messages.h"
 #include "Lsn.h"
 #include "qpid/store/StoreException.h"
+#include <boost/foreach.hpp>
 
 namespace qpid {
 namespace store {
@@ -67,37 +68,25 @@ Messages::enqueue(uint64_t msgId, uint64
             THROW_STORE_EXCEPTION("Message does not exist");
         p = i->second;
     }
-    // If transacted, it still needs to be counted as enqueued to ensure it
-    // is not deleted. Remember the transacted operation so it can be properly
-    // resolved later.
-    ::InterlockedIncrement(&p->enqueuedCount);
-    uint64_t transactionId = 0;
-    if (t.get() != 0)
-        transactionId = t->getId();
-    if (transactionId != 0) {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-        p->transOps[t].push_back(MessageInfo::TRANSACTION_ENQUEUE);
-        t->enroll(msgId);
-    }
-    try {
-        log.recordEnqueue(msgId, queueId, transactionId);
-    }
-    catch (...) {
-        // Undo the record-keeping if the log wasn't written correctly.
-        ::InterlockedDecrement(&p->enqueuedCount);
-        if (transactionId != 0) {
-            t->unenroll(msgId);
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-            std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
-            std::vector<MessageInfo::TransType>::iterator i;
-            for (i = oplist.begin(); i < oplist.end(); ++i) {
-                if (*i == MessageInfo::TRANSACTION_ENQUEUE) {
-                    oplist.erase(i);
-                    break;
-                }
-            }
+    MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE);
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+        p->where.push_back(loc);
+        uint64_t transactionId = 0;
+        if (t.get() != 0) {
+            transactionId = t->getId();
+            t->enroll(msgId);
+        }
+        try {
+            log.recordEnqueue(msgId, queueId, transactionId);
+        }
+        catch (...) {
+            // Undo the record-keeping if the log wasn't written correctly.
+            if (transactionId != 0)
+                t->unenroll(msgId);
+            p->where.pop_back();
+            throw;
         }
-        throw;
     }
 }
 
@@ -112,38 +101,46 @@ Messages::dequeue(uint64_t msgId, uint64
             THROW_STORE_EXCEPTION("Message does not exist");
         p = i->second;
     }
-    // Remember the transacted operation so it can be properly resolved later.
-    uint64_t transactionId = 0;
-    if (t.get() != 0)
-        transactionId = t->getId();
-    if (transactionId != 0) {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-        p->transOps[t].push_back(MessageInfo::TRANSACTION_DEQUEUE);
-        t->enroll(msgId);
-    }
-    try {
-        log.recordDequeue(msgId, queueId, transactionId);
-    }
-    catch(...) {
+    {
+        // Locate the 'where' entry for the specified queue. Once this operation
+        // is recorded in the log, update the 'where' entry to reflect it.
+        // Note that an existing entry in 'where' that refers to a transaction
+        // is not eligible for this operation.
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+        std::list<MessageInfo::Location>::iterator i;
+        for (i = p->where.begin(); i != p->where.end(); ++i) {
+            if (i->queueId == queueId && i->transaction.get() == 0)
+                break;
+        }
+        if (i == p->where.end())
+            THROW_STORE_EXCEPTION("Message not on queue");
+        uint64_t transactionId = 0;
+        if (t.get() != 0) {
+            transactionId = t->getId();
+            t->enroll(msgId);
+        }
+        try {
+            log.recordDequeue(msgId, queueId, transactionId);
+        }
+        catch (...) {
+            // Undo the record-keeping if the log wasn't written correctly.
+            if (transactionId != 0)
+                t->unenroll(msgId);
+            throw;
+        }
+        // Ok, logged successfully. If this is a transactional op, note
+        // the transaction. If non-transactional, remove the 'where' entry.
         if (transactionId != 0) {
-            t->unenroll(msgId);
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-            std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
-            std::vector<MessageInfo::TransType>::iterator i;
-            for (i = oplist.begin(); i < oplist.end(); ++i) {
-                if (*i == MessageInfo::TRANSACTION_DEQUEUE) {
-                    oplist.erase(i);
-                    break;
-                }
-            }
+            i->transaction = t;
+            i->disposition = MessageInfo::TRANSACTION_DEQUEUE;
+        }
+        else {
+            p->where.erase(i);
+            // If the message doesn't exist on any other queues, remove it.
+            if (p->where.empty())
+                remove(msgId);
         }
-        throw;
     }
-
-    // If transacted, leave the reference until the transaction commits.
-    if (transactionId == 0)
-        if (::InterlockedDecrement(&p->enqueuedCount) == 0)
-            remove(msgId);
 }
 
 // Commit a previous provisional enqueue or dequeue of a particular message
@@ -161,22 +158,23 @@ Messages::commit(uint64_t msgId, Transac
         p = i->second;
     }
     {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-        std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
-        std::vector<MessageInfo::TransType>::iterator i;
-        for (i = oplist.begin(); i < oplist.end(); ++i) {
-            // Transactional dequeues left the ref count alone until commit
-            // while transaction enqueues already incremented it.
-            if (*i == MessageInfo::TRANSACTION_DEQUEUE)
-                ::InterlockedDecrement(&p->enqueuedCount);
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+        std::list<MessageInfo::Location>::iterator i;
+        for (i = p->where.begin(); i != p->where.end(); ++i) {
+            if (i->transaction != t)
+                continue;
+            // Transactional dequeues can now remove the item from the
+            // where list; enqueues just clear the transaction reference.
+            if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE)
+                i = p->where.erase(i);
+            else
+                i->transaction.reset();
         }
-        // Remember, last deref of Transaction::shared_ptr deletes Transaction.
-        p->transOps.erase(t);
     }
     // If committing results in this message having no further enqueue
     // references, delete it. If the delete fails, swallow the exception
     // and let recovery take care of removing it later.
-    if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+    if (p->where.empty()) {
         try {
             remove(msgId);
         }
@@ -199,22 +197,28 @@ Messages::abort(uint64_t msgId, Transact
         p = i->second;
     }
     {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
-        std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
-        std::vector<MessageInfo::TransType>::iterator i;
-        for (i = oplist.begin(); i < oplist.end(); ++i) {
-            // Transactional enqueues incremented the ref count when seen;
-            // while transaction dequeues left it alone.
-            if (*i == MessageInfo::TRANSACTION_ENQUEUE)
-                ::InterlockedDecrement(&p->enqueuedCount);
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+        std::list<MessageInfo::Location>::iterator i = p->where.begin();
+        while (i != p->where.end()) {
+            if (i->transaction != t) {
+                ++i;
+                continue;
+            }
+            // Aborted transactional dequeues result in the message remaining
+            // enqueued like before the operation; enqueues clear the
+            // message from the where list - like the enqueue never happened.
+            if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE)
+                i = p->where.erase(i);
+            else {
+                i->transaction.reset();
+                ++i;
+            }
         }
-        // Remember, last deref of Transaction::shared_ptr deletes Transaction.
-        p->transOps.erase(t);
     }
-    // If committing results in this message having no further enqueue
+    // If aborting results in this message having no further enqueue
     // references, delete it. If the delete fails, swallow the exception
     // and let recovery take care of removing it later.
-    if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+    if (p->where.empty()) {
         try {
             remove(msgId);
         }
@@ -237,9 +241,10 @@ Messages::loadContent(uint64_t msgId,
 // the log.
 void
 Messages::recover(qpid::broker::RecoveryManager& recoverer,
+                  const std::set<uint64_t> &validQueues,
+                  const std::map<uint64_t, Transaction::shared_ptr>& transMap,
                   qpid::store::MessageMap& messageMap,
-                  qpid::store::MessageQueueMap& messageQueueMap,
-                  const std::map<uint64_t, Transaction::shared_ptr>& transMap)
+                  qpid::store::MessageQueueMap& messageQueueMap)
 {
     std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
     log.recover(recoverer, messageMap, messageOps);
@@ -264,9 +269,16 @@ Messages::recover(qpid::broker::Recovery
         std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
         for (op = ops.begin(); op != ops.end(); ++op) {
             QueueEntry entry(op->queueId);
+            MessageInfo::Location loc(op->queueId);
             std::string dir =
                 op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
                                                         : "dequeue";
+            if (validQueues.find(op->queueId) == validQueues.end()) {
+                QPID_LOG(info,
+                         "Message " << msgId << dir << " on non-existant queue "
+                         << op->queueId << "; dropped");
+                continue;
+            }
             if (op->txnId != 0) {
                 // Be sure to enroll this message in the transaction even if
                 // it has committed or aborted. This ensures that the
@@ -276,22 +288,18 @@ Messages::recover(qpid::broker::Recovery
                 // it couldn't be recovered again.
                 //
                 // Recall what is being reconstructed; 2 things:
-                //   1. This class's 'messages' list which only keeps track
-                //      of how many queues reference each message (though NOT
-                //      which queues) and the transactions each message is
-                //      enrolled in. For this, aborted transactions cause the
-                //      result of the operation to be ignored, but the
-                //      message does need to be enrolled in the transaction
-                //      to properly maintain the transaction references until
-                //      the message is deleted.
-                //   2. The StorageProvider's MessageQueueMap, which DOES
-                //      have an entry for each queue each message is on and
+                //   1. This class's 'messages' list which keeps track
+                //      of the queues each message is on and the transactions
+                //      each message is enrolled in. For this, aborted
+                //      transactions cause the result of the operation to be
+                //      ignored, but the message does need to be enrolled in
+                //      the transaction to properly maintain the transaction
+                //      references until the message is deleted.
+                //   2. The StorageProvider's MessageQueueMap, which also
+                //      has an entry for each queue each message is on and
                 //      its TPL status and associated xid.
                 const Transaction::shared_ptr &t =
                     transMap.find(op->txnId)->second;
-                // Adds t to map, ensuring a reference to Transaction, even if
-                // no ops are added to the TransType vector.
-                std::vector<MessageInfo::TransType>& tOps = m->transOps[t];
                 // Prepared transactions cause the operation to be
                 // provisionally acted on, and the message to be enrolled in
                 // the transaction for when it commits/aborts. This is
@@ -304,13 +312,14 @@ Messages::recover(qpid::broker::Recovery
                         THROW_STORE_EXCEPTION("Invalid transaction state");
                     t->enroll(msgId);
                     entry.xid = tpct->getXid();
+                    loc.transaction = t;
                     if (op->op == MessageLog::RECOVERED_ENQUEUE) {
-                        tOps.push_back(MessageInfo::TRANSACTION_ENQUEUE);
                         entry.tplStatus = QueueEntry::ADDING;
+                        loc.disposition = MessageInfo::TRANSACTION_ENQUEUE;
                     }
                     else {
-                        tOps.push_back(MessageInfo::TRANSACTION_DEQUEUE);
                         entry.tplStatus = QueueEntry::REMOVING;
+                        loc.disposition = MessageInfo::TRANSACTION_DEQUEUE;
                     }
                 }
                 else if (t->getState() != Transaction::TRANS_COMMITTED) {
@@ -329,42 +338,107 @@ Messages::recover(qpid::broker::Recovery
             // it if the current op is non-transactional; if it's a prepared
             // transaction then replace the existing entry with the current
             // one that notes the message is enqueued but being removed under
-            // a prepared transaciton.
+            // a prepared transaction.
             QPID_LOG(debug, dir + " at queue " << entry.queueId);
             if (op->op == MessageLog::RECOVERED_ENQUEUE) {
                 entries.push_back(entry);
+                m->where.push_back(loc);
             }
             else {
                 std::vector<QueueEntry>::iterator i = entries.begin();
                 while (i != entries.end()) {
                     if (i->queueId == entry.queueId) {
-                        *i = entry;
+                        if (entry.tplStatus != QueueEntry::NONE)
+                            *i = entry;
+                        else
+                            entries.erase(i);
                         break;
                     }
                     ++i;
                 }
+                std::list<MessageInfo::Location>::iterator w = m->where.begin();
+                while (w != m->where.end()) {
+                    if (w->queueId == loc.queueId) {
+                        if (loc.transaction.get() != 0)
+                            *w = loc;
+                        else
+                            m->where.erase(w);
+                    }
+                }
             }
         }
-        // Now that all the queue entries have been set correctly, the
-        // enqueuedCount that MessageInfo keeps track of is simply the
-        // number of queue map entries. If there are none, add this
-        // message to the homeless list to be deleted from the log after
-        // the recovery is done.
-        if ((m->enqueuedCount = entries.size()) == 0) {
+        // Now that all the queue entries have been set correctly, see if
+        // there are any entries; they may have all been removed during
+        // recovery. If there are none, add this message to the homeless
+        // list to be deleted from the log after the recovery is done.
+        if (m->where.size() == 0) {
             homeless.push_back(msgId);
             messageMap.erase(msgId);
             messageQueueMap.erase(msgId);
         }
-        std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
-        messages.insert(p);
+        else {
+            std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
+            messages.insert(p);
+        }
     }
     QPID_LOG(debug, "Message log recovery done.");
     // Done! Ok, go back and delete all the homeless messages.
-    for (std::vector<uint64_t>::iterator i = homeless.begin();
-         i != homeless.end();
-         ++i) {
-        QPID_LOG(debug, "Deleting homeless message " << *i);
-        remove(*i);
+    BOOST_FOREACH(uint64_t msg, homeless) {
+        QPID_LOG(debug, "Deleting homeless message " << msg);
+        remove(msg);
+    }
+}
+
+// Expunge is called when a queue is deleted. All references to that
+// queue must be expunged from all messages. 'Dequeue' log records are
+// written for each queue entry removed, but any errors are swallowed.
+// On recovery there's a list of valid queues passed in. The deleted
+// queue will not be on that list so if any references to it are
+// recovered they'll get weeded out then.
+void
+Messages::expunge(uint64_t queueId)
+{
+    std::vector<uint64_t> toBeDeleted;   // Messages to be deleted later.
+
+    {
+        // Lock everybody out since all messages are possibly in play.
+        // There also may be other threads already working on particular
+        // messages so individual message mutex still must be acquired.
+        qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+        MessageMap::iterator m;
+        for (m = messages.begin(); m != messages.end(); ++m) {
+            MessageInfo::shared_ptr p = m->second;
+            {
+                qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock);
+                std::list<MessageInfo::Location>::iterator i = p->where.begin();
+                while (i != p->where.end()) {
+                    if (i->queueId != queueId) {
+                        ++i;
+                        continue;
+                    }
+                    // If this entry is involved in a transaction, unenroll it.
+                    // Then remove the entry.
+                    if (i->transaction.get() != 0)
+                        i->transaction->unenroll(m->first);
+                    i = p->where.erase(i);
+                    try {
+                        log.recordDequeue(m->first, queueId, 0);
+                    }
+                    catch(...) {
+                    }
+                }
+                if (p->where.size() == 0)
+                    toBeDeleted.push_back(m->first);
+            }
+        }
+    }
+    // Swallow any exceptions during this; don't care. Recover it later
+    // if needed.
+    try {
+        BOOST_FOREACH(uint64_t msg, toBeDeleted)
+            remove(msg);
+    }
+    catch(...) {
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1031325&r1=1031324&r2=1031325&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Thu Nov  4 22:52:10 2010
@@ -24,6 +24,7 @@
 
 #include <windows.h>
 #include <map>
+#include <set>
 #include <vector>
 #include <boost/intrusive_ptr.hpp>
 #include <qpid/broker/PersistableMessage.h>
@@ -46,9 +47,38 @@ class Messages {
         // Keep a list of transactional operations this message is
         // referenced in. When the transaction changes/finalizes these all
         // need to be acted on.
-        typedef enum { TRANSACTION_ENQUEUE, TRANSACTION_DEQUEUE } TransType;
+        typedef enum { TRANSACTION_NONE = 0,
+                       TRANSACTION_ENQUEUE,
+                       TRANSACTION_DEQUEUE } TransType;
+#if 0
         std::map<Transaction::shared_ptr, std::vector<TransType> > transOps;
         qpid::sys::Mutex transOpsLock;
+#endif
+      // Think what I need is a list of "where is this message" - queue id,
+      // transaction ref, what kind of trans op (enq/deq). Then "remove all
+      // queue refs" can search through all messages looking for queue ids
+      // and undo them. Write "remove from queue" record to log. Also need to
+      // add "remove from queue" to recovery.
+        struct Location {
+            uint64_t queueId;
+            Transaction::shared_ptr transaction;
+            TransType disposition;
+
+            Location(uint64_t q)
+                : queueId(q), transaction(), disposition(TRANSACTION_NONE) {}
+            Location(uint64_t q, Transaction::shared_ptr& t, TransType d)
+                : queueId(q), transaction(t), disposition(d) {}
+        };
+        qpid::sys::Mutex whereLock;
+        std::list<Location> where;
+        // The transactions vector just keeps a shared_ptr to each
+        // Transaction this message was involved in, regardless of the
+        // disposition or transaction state. Keeping a valid shared_ptr
+        // prevents the Transaction from being deleted. As long as there
+        // are any messages that referred to a transaction, that
+        // transaction's state needs to be known so the message disposition
+        // can be correctly recovered if needed.
+        std::vector<Transaction::shared_ptr> transactions;
 
         typedef boost::shared_ptr<MessageInfo> shared_ptr;
 
@@ -96,12 +126,17 @@ public:
                      uint64_t offset,
                      uint32_t length);
 
+    // Expunge is called when a queue is deleted. All references to that
+    // queue must be expunged from all messages.
+    void expunge(uint64_t queueId);
+
     // Recover the current set of messages and where they're queued from
     // the log.
     void recover(qpid::broker::RecoveryManager& recoverer,
+                 const std::set<uint64_t> &validQueues,
+                 const std::map<uint64_t, Transaction::shared_ptr>& transMap,
                  qpid::store::MessageMap& messageMap,
-                 qpid::store::MessageQueueMap& messageQueueMap,
-                 const std::map<uint64_t, Transaction::shared_ptr>& transMap);
+                 qpid::store::MessageQueueMap& messageQueueMap);
 };
 
 }}}  // namespace qpid::store::ms_clfs



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org