You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/10/22 01:09:01 UTC

svn commit: r1026175 [2/2] - in /qpid/trunk/qpid/cpp/src/qpid/store: ./ ms-clfs/ ms-sql/

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,383 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/log/Statement.h>
+
+#include "Messages.h"
+#include "Lsn.h"
+#include "qpid/store/StoreException.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+Messages::openLog(const std::string& path, const Log::TuningParameters& params)
+{
+    log.open (path, params);
+}
+
+void
+Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+    uint64_t id = log.add(msg);
+    msg->setPersistenceId(id);
+    std::auto_ptr<MessageInfo> autom(new MessageInfo);
+    MessageInfo::shared_ptr m(autom);
+    std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m);
+    {
+        qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+        messages.insert(p);
+        // If there's only this one message there, move the tail to it.
+        // This prevents the log from continually growing when messages
+        // are added and removed one at a time.
+        if (messages.size() == 1) {
+            CLFS_LSN newTail = idToLsn(id);
+            log.moveTail(newTail);
+        }
+    }
+}
+
+void
+Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+    MessageInfo::shared_ptr p;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+        MessageMap::const_iterator i = messages.find(msgId);
+        if (i == messages.end())
+            THROW_STORE_EXCEPTION("Message does not exist");
+        p = i->second;
+    }
+    // If transacted, it still needs to be counted as enqueued to ensure it
+    // is not deleted. Remember the transacted operation so it can be properly
+    // resolved later.
+    ::InterlockedIncrement(&p->enqueuedCount);
+    uint64_t transactionId = 0;
+    if (t.get() != 0)
+        transactionId = t->getId();
+    if (transactionId != 0) {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+        p->transOps[t].push_back(MessageInfo::TRANSACTION_ENQUEUE);
+        t->enroll(msgId);
+    }
+    try {
+        log.recordEnqueue(msgId, queueId, transactionId);
+    }
+    catch (...) {
+        // Undo the record-keeping if the log wasn't written correctly.
+        ::InterlockedDecrement(&p->enqueuedCount);
+        if (transactionId != 0) {
+            t->unenroll(msgId);
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+            std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
+            std::vector<MessageInfo::TransType>::iterator i;
+            for (i = oplist.begin(); i < oplist.end(); ++i) {
+                if (*i == MessageInfo::TRANSACTION_ENQUEUE) {
+                    oplist.erase(i);
+                    break;
+                }
+            }
+        }
+        throw;
+    }
+}
+
+void
+Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+    MessageInfo::shared_ptr p;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+        MessageMap::const_iterator i = messages.find(msgId);
+        if (i == messages.end())
+            THROW_STORE_EXCEPTION("Message does not exist");
+        p = i->second;
+    }
+    // Remember the transacted operation so it can be properly resolved later.
+    uint64_t transactionId = 0;
+    if (t.get() != 0)
+        transactionId = t->getId();
+    if (transactionId != 0) {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+        p->transOps[t].push_back(MessageInfo::TRANSACTION_DEQUEUE);
+        t->enroll(msgId);
+    }
+    try {
+        log.recordDequeue(msgId, queueId, transactionId);
+    }
+    catch(...) {
+        if (transactionId != 0) {
+            t->unenroll(msgId);
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+            std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
+            std::vector<MessageInfo::TransType>::iterator i;
+            for (i = oplist.begin(); i < oplist.end(); ++i) {
+                if (*i == MessageInfo::TRANSACTION_DEQUEUE) {
+                    oplist.erase(i);
+                    break;
+                }
+            }
+        }
+        throw;
+    }
+
+    // If transacted, leave the reference until the transaction commits.
+    if (transactionId == 0)
+        if (::InterlockedDecrement(&p->enqueuedCount) == 0)
+            remove(msgId);
+}
+
+// Commit a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::commit(uint64_t msgId, Transaction::shared_ptr& t)
+{
+    MessageInfo::shared_ptr p;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+        MessageMap::const_iterator i = messages.find(msgId);
+        if (i == messages.end())
+            THROW_STORE_EXCEPTION("Message does not exist");
+        p = i->second;
+    }
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+        std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
+        std::vector<MessageInfo::TransType>::iterator i;
+        for (i = oplist.begin(); i < oplist.end(); ++i) {
+            // Transactional dequeues left the ref count alone until commit
+            // while transaction enqueues already incremented it.
+            if (*i == MessageInfo::TRANSACTION_DEQUEUE)
+                ::InterlockedDecrement(&p->enqueuedCount);
+        }
+        // Remember, last deref of Transaction::shared_ptr deletes Transaction.
+        p->transOps.erase(t);
+    }
+    // If committing results in this message having no further enqueue
+    // references, delete it. If the delete fails, swallow the exception
+    // and let recovery take care of removing it later.
+    if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+        try {
+            remove(msgId);
+        }
+        catch(...) {}
+    }
+}
+
+// Abort a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::abort(uint64_t msgId, Transaction::shared_ptr& t)
+{
+    MessageInfo::shared_ptr p;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+        MessageMap::const_iterator i = messages.find(msgId);
+        if (i == messages.end())
+            THROW_STORE_EXCEPTION("Message does not exist");
+        p = i->second;
+    }
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock);
+        std::vector<MessageInfo::TransType> &oplist = p->transOps[t];
+        std::vector<MessageInfo::TransType>::iterator i;
+        for (i = oplist.begin(); i < oplist.end(); ++i) {
+            // Transactional enqueues incremented the ref count when seen;
+            // while transaction dequeues left it alone.
+            if (*i == MessageInfo::TRANSACTION_ENQUEUE)
+                ::InterlockedDecrement(&p->enqueuedCount);
+        }
+        // Remember, last deref of Transaction::shared_ptr deletes Transaction.
+        p->transOps.erase(t);
+    }
+    // If committing results in this message having no further enqueue
+    // references, delete it. If the delete fails, swallow the exception
+    // and let recovery take care of removing it later.
+    if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) {
+        try {
+            remove(msgId);
+        }
+        catch(...) {}
+    }
+}
+
+// Recover the current set of messages and where they're queued from
+// the log.
+void
+Messages::recover(qpid::broker::RecoveryManager& recoverer,
+                  qpid::store::MessageMap& messageMap,
+                  qpid::store::MessageQueueMap& messageQueueMap,
+                  const std::map<uint64_t, Transaction::shared_ptr>& transMap)
+{
+    std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
+    log.recover(recoverer, messageMap, messageOps);
+    // Now read through the messageOps replaying the operations with the
+    // knowledge of which transactions committed, aborted, etc. A transaction
+    // should not be deleted until there are no messages referencing it so
+    // a message operation with a transaction id not found in transMap is
+    // a serious problem.
+    QPID_LOG(debug, "Beginning CLFS-recovered message operation replay");
+    // Keep track of any messages that are recovered from the log but don't
+    // have any place to be. This can happen, for example, if the broker
+    // crashes while logging a message deletion. After all the recovery is
+    // done, delete all the homeless messages.
+    std::vector<uint64_t> homeless;
+    std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg;
+    for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
+        uint64_t msgId = msg->first;
+        const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
+        QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " ops");
+        MessageInfo::shared_ptr m(new MessageInfo);
+        std::vector<QueueEntry>& entries = messageQueueMap[msgId];
+        std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
+        for (op = ops.begin(); op != ops.end(); ++op) {
+            QueueEntry entry(op->queueId);
+            std::string dir =
+                op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
+                                                        : "dequeue";
+            if (op->txnId != 0) {
+                // Be sure to enroll this message in the transaction even if
+                // it has committed or aborted. This ensures that the
+                // transaction isn't removed from the log while finalizing the
+                // recovery. If it were to be removed and the broker failed
+                // again before removing this message during normal operation,
+                // it couldn't be recovered again.
+                //
+                // Recall what is being reconstructed; 2 things:
+                //   1. This class's 'messages' list which only keeps track
+                //      of how many queues reference each message (though NOT
+                //      which queues) and the transactions each message is
+                //      enrolled in. For this, aborted transactions cause the
+                //      result of the operation to be ignored, but the
+                //      message does need to be enrolled in the transaction
+                //      to properly maintain the transaction references until
+                //      the message is deleted.
+                //   2. The StorageProvider's MessageQueueMap, which DOES
+                //      have an entry for each queue each message is on and
+                //      its TPL status and associated xid.
+                const Transaction::shared_ptr &t =
+                    transMap.find(op->txnId)->second;
+                // Adds t to map, ensuring a reference to Transaction, even if
+                // no ops are added to the TransType vector.
+                std::vector<MessageInfo::TransType>& tOps = m->transOps[t];
+                // Prepared transactions cause the operation to be
+                // provisionally acted on, and the message to be enrolled in
+                // the transaction for when it commits/aborts. This is
+                // noted in the QueueEntry for the StorageProvider's map.
+                if (t->getState() == Transaction::TRANS_PREPARED) {
+                    QPID_LOG(debug, dir << " for queue " << op->queueId <<
+                                    ", prepared txn " << op->txnId);
+                    TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+                    if (tpct.get() == 0)
+                        THROW_STORE_EXCEPTION("Invalid transaction state");
+                    t->enroll(msgId);
+                    entry.xid = tpct->getXid();
+                    if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+                        tOps.push_back(MessageInfo::TRANSACTION_ENQUEUE);
+                        entry.tplStatus = QueueEntry::ADDING;
+                    }
+                    else {
+                        tOps.push_back(MessageInfo::TRANSACTION_DEQUEUE);
+                        entry.tplStatus = QueueEntry::REMOVING;
+                    }
+                }
+                else if (t->getState() != Transaction::TRANS_COMMITTED) {
+                    QPID_LOG(debug, dir << " for queue " << op->queueId <<
+                                    ", txn " << op->txnId << ", rolling back");
+                    continue;
+                }
+            }
+            // Here for non-transactional and prepared transactional operations
+            // to set up the messageQueueMap entries. Note that at this point
+            // a committed transactional operation looks like a
+            // non-transactional one as far as the QueueEntry is
+            // concerned - just do it. If this is an entry enqueuing a
+            // message, just add it to the entries list. If it's a dequeue
+            // operation, locate the matching entry for the queue and delete
+            // it if the current op is non-transactional; if it's a prepared
+            // transaction then replace the existing entry with the current
+            // one that notes the message is enqueued but being removed under
+            // a prepared transaciton.
+            QPID_LOG(debug, dir + " at queue " << entry.queueId);
+            if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+                entries.push_back(entry);
+            }
+            else {
+                std::vector<QueueEntry>::iterator i = entries.begin();
+                while (i != entries.end()) {
+                    if (i->queueId == entry.queueId) {
+                        *i = entry;
+                        break;
+                    }
+                    ++i;
+                }
+            }
+        }
+        // Now that all the queue entries have been set correctly, the
+        // enqueuedCount that MessageInfo keeps track of is simply the
+        // number of queue map entries. If there are none, add this
+        // message to the homeless list to be deleted from the log after
+        // the recovery is done.
+        if ((m->enqueuedCount = entries.size()) == 0) {
+            homeless.push_back(msgId);
+            messageMap.erase(msgId);
+            messageQueueMap.erase(msgId);
+        }
+        std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
+        messages.insert(p);
+    }
+    QPID_LOG(debug, "Message log recovery done.");
+    // Done! Ok, go back and delete all the homeless messages.
+    for (std::vector<uint64_t>::iterator i = homeless.begin();
+         i != homeless.end();
+         ++i) {
+        QPID_LOG(debug, "Deleting homeless message " << *i);
+        remove(*i);
+    }
+}
+
+// Remove a specified message from those controlled by this object.
+void
+Messages::remove(uint64_t messageId)
+{
+    uint64_t newFirstId = 0;
+    {
+        qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+        messages.erase(messageId);
+        // May have deleted the first entry; if so the log can release that.
+        // If this message being deleted results in an empty list of
+        // messages, move the tail up to this message's LSN. This may
+        // result in one or more messages being stranded in the log
+        // until there's more activity. If a restart happens while these
+        // unneeded log records are there, the presence of the MessageDelete
+        // entry will cause the message(s) to be ignored anyway.
+        if (messages.empty())
+            newFirstId = messageId;
+        else if (messages.begin()->first > messageId)
+            newFirstId = messages.begin()->first;
+    }
+    log.deleteMessage(messageId, newFirstId);
+}
+
+}}}  // namespace qpid::store::ms_clfs

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,103 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGES_H
+#define QPID_STORE_MSCLFS_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <map>
+#include <vector>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/sys/Mutex.h>
+
+#include "MessageLog.h"
+#include "Transaction.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages {
+
+    struct MessageInfo {
+        // How many queues this message is on, whether actually (non-transacted)
+        // or provisionally (included in a non-yet-committed transaction).
+        volatile LONG enqueuedCount;
+
+        // Keep a list of transactional operations this message is
+        // referenced in. When the transaction changes/finalizes these all
+        // need to be acted on.
+        typedef enum { TRANSACTION_ENQUEUE, TRANSACTION_DEQUEUE } TransType;
+        std::map<Transaction::shared_ptr, std::vector<TransType> > transOps;
+        qpid::sys::Mutex transOpsLock;
+
+        typedef boost::shared_ptr<MessageInfo> shared_ptr;
+
+        MessageInfo()
+            : enqueuedCount(0) { /*latestLsn.Internal = 0;*/ }
+    };
+
+    qpid::sys::RWlock lock;
+    typedef std::map<uint64_t, MessageInfo::shared_ptr> MessageMap;
+    MessageMap messages;
+    MessageLog log;
+
+    // Remove a specified message from those controlled by this object.
+    void remove(uint64_t messageId);
+
+public:
+    void openLog(const std::string& path, const Log::TuningParameters& params);
+
+    // Add the specified message to the log and list of known messages.
+    // Upon successful return the message's persistenceId is set.
+    void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+    // Add the specified queue to the message's list of places it is
+    // enqueued.
+    void enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+    // Remove the specified queue from the message's list of places it is
+    // enqueued. If there are no other queues holding the message, it is
+    // deleted.
+    void dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+    // Commit a previous provisional enqueue or dequeue of a particular message
+    // actions under a specified transaction. If this results in the message's
+    // being removed from all queues, it is deleted.
+    void commit(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+    // Abort a previous provisional enqueue or dequeue of a particular message
+    // actions under a specified transaction. If this results in the message's
+    // being removed from all queues, it is deleted.
+    void abort(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+    // Recover the current set of messages and where they're queued from
+    // the log.
+    void recover(qpid::broker::RecoveryManager& recoverer,
+                 qpid::store::MessageMap& messageMap,
+                 qpid::store::MessageQueueMap& messageQueueMap,
+                 const std::map<uint64_t, Transaction::shared_ptr>& transMap);
+};
+
+}}}  // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGES_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Transaction.h"
+#include "Messages.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Transaction::~Transaction()
+{
+    // Transactions that are recovered then found to be deleted get destroyed
+    // but need not be logged.
+    if (state != TRANS_DELETED)
+        log->deleteTransaction(id);
+}
+
+void
+Transaction::enroll(uint64_t msgId)
+{
+    qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+    enrolledMessages.push_back(msgId);
+}
+
+void
+Transaction::unenroll(uint64_t msgId)
+{
+    qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+    for (std::vector<uint64_t>::iterator i = enrolledMessages.begin();
+         i < enrolledMessages.end();
+         ++i) {
+        if (*i == msgId) {
+            enrolledMessages.erase(i);
+            break;
+        }
+    }
+}
+
+void
+Transaction::abort(Messages& messages)
+{
+    log->recordAbort(id);
+    for (size_t i = 0; i < enrolledMessages.size(); ++i)
+        messages.abort(enrolledMessages[i], shared_from_this());
+    state = TRANS_ABORTED;
+}
+
+void
+Transaction::commit(Messages& messages)
+{
+    log->recordCommit(id);
+    for (size_t i = 0; i < enrolledMessages.size(); ++i)
+        messages.commit(enrolledMessages[i], shared_from_this());
+    state = TRANS_COMMITTED;
+}
+
+void
+TPCTransaction::prepare(void)
+{
+    log->recordPrepare(id);
+    state = TRANS_PREPARED;
+}
+
+}}}  // namespace qpid::store::ms_clfs

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,146 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTION_H
+#define QPID_STORE_MSCLFS_TRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+#include "TransactionLog.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages;
+
+/**
+ * @class Transaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TransactionContext, which
+ * holds a pointer to Transaction.
+ *
+ * NOTE: All references to Transactions (and TPCTransactions, below) are
+ * through Boost shared_ptr instances. All messages enrolled in a transaction
+ * hold a shared_ptr. Thus, a Transaction object will not be deleted until all
+ * messages holding a reference to it are deleted. This fact is also used
+ * during recovery to automatically clean up and delete any Transaction without
+ * messages left referring to it.
+ */
+class Transaction : public boost::enable_shared_from_this<Transaction> {
+private:
+    // TransactionLog has to create all Transaction instances.
+    Transaction() {}
+
+public:
+
+    typedef boost::shared_ptr<Transaction> shared_ptr;
+    typedef enum { TRANS_OPEN = 1,
+                   TRANS_PREPARED,
+                   TRANS_ABORTED,
+                   TRANS_COMMITTED,
+                   TRANS_DELETED    } State;
+
+    virtual ~Transaction();
+
+    uint64_t getId() { return id; }
+    State getState() { return state; }
+
+    void enroll(uint64_t msgId);
+    void unenroll(uint64_t msgId);   // For failed ops, not normal end-of-trans
+
+    void abort(Messages& messages);
+    void commit(Messages& messages);
+
+protected:
+    friend class TransactionLog;
+    Transaction(uint64_t _id, const TransactionLog::shared_ptr& _log)
+        : id(_id), state(TRANS_OPEN), log(_log) {}
+
+    uint64_t id;
+    State state;
+    TransactionLog::shared_ptr log;
+    std::vector<uint64_t> enrolledMessages;
+    qpid::sys::RWlock enrollLock;
+};
+
+class TransactionContext : public qpid::broker::TransactionContext {
+    Transaction::shared_ptr transaction;
+
+public:
+    TransactionContext(const Transaction::shared_ptr& _transaction)
+        : transaction(_transaction) {}
+
+    virtual Transaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+/**
+ * @class TPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TPCTransactionContext, which
+ * holds a pointer to TPCTransaction.
+ */
+class TPCTransaction : public Transaction {
+
+    friend class TransactionLog;
+    TPCTransaction(uint64_t _id,
+                   const TransactionLog::shared_ptr& _log,
+                   const std::string& _xid)
+        : Transaction(_id, _log), xid(_xid) {}
+
+    std::string  xid;
+
+public: 
+    typedef boost::shared_ptr<TPCTransaction> shared_ptr;
+
+    virtual ~TPCTransaction() {}
+
+    void prepare();
+    bool isPrepared() const { return state == TRANS_PREPARED; }
+
+    const std::string& getXid(void) const { return xid; }
+};
+
+class TPCTransactionContext : public qpid::broker::TPCTransactionContext {
+    TPCTransaction::shared_ptr transaction;
+
+public:
+    TPCTransactionContext(const TPCTransaction::shared_ptr& _transaction)
+        : transaction(_transaction) {}
+
+    virtual TPCTransaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+}}}  // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTION_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,415 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "TransactionLog.h"
+#include "Transaction.h"
+#include "Lsn.h"
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum TransactionEntryType {
+    TransactionStartDtxEntry         = 1,
+    TransactionStartTxEntry          = 2,
+    TransactionPrepareEntry          = 3,
+    TransactionCommitEntry           = 4,
+    TransactionAbortEntry            = 5,
+    TransactionDeleteEntry           = 6
+};
+// The only thing that really takes up space in transaction records is the
+// xid. Max xid length is in the neighborhood of 600 bytes. Leave some room.
+static const uint32_t MaxTransactionContentLength = 1024;
+
+// Dtx-Start
+struct TransactionStartDtx {
+    TransactionEntryType type;
+    uint32_t length;
+    char content[MaxTransactionContentLength];
+
+    TransactionStartDtx()
+        : type(TransactionStartDtxEntry), length(0) {}
+};
+// Tx-Start
+struct TransactionStartTx {
+    TransactionEntryType type;
+
+    TransactionStartTx()
+        : type(TransactionStartTxEntry) {}
+};
+// Prepare
+struct TransactionPrepare {
+    TransactionEntryType type;
+
+    TransactionPrepare()
+        : type(TransactionPrepareEntry) {}
+};
+// Commit
+struct TransactionCommit {
+    TransactionEntryType type;
+
+    TransactionCommit()
+        : type(TransactionCommitEntry) {}
+};
+// Abort
+struct TransactionAbort {
+    TransactionEntryType type;
+
+    TransactionAbort()
+        : type(TransactionAbortEntry) {}
+};
+// Delete
+struct TransactionDelete {
+    TransactionEntryType type;
+
+    TransactionDelete()
+        : type(TransactionDeleteEntry) {}
+};
+
+}   // namespace
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+uint32_t
+TransactionLog::marshallingBufferSize()
+{
+    size_t biggestNeed = sizeof(TransactionStartDtx);
+    uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+    uint32_t minSize = Log::marshallingBufferSize();
+    if (defSize <= minSize)
+        return minSize;
+    // Round up to multiple of minSize
+    return (defSize + minSize) / minSize * minSize;
+}
+
+// Get a new Transaction
+boost::shared_ptr<Transaction>
+TransactionLog::begin()
+{
+    TransactionStartTx entry;
+    CLFS_LSN location;
+    uint64_t id;
+    uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+    location = write(&entry, entryLength);
+    try {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+        id = lsnToId(location);
+        std::auto_ptr<Transaction> t(new Transaction(id, shared_from_this()));
+        boost::shared_ptr<Transaction> t2(t);
+        boost::weak_ptr<Transaction> weak_t2(t2);
+        {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+            validIds[id] = weak_t2;
+        }
+        return t2;
+    }
+    catch(...) {
+        deleteTransaction(id);
+        throw;
+    }
+}
+
+// Get a new TPCTransaction
+boost::shared_ptr<TPCTransaction>
+TransactionLog::begin(const std::string& xid)
+{
+    TransactionStartDtx entry;
+    CLFS_LSN location;
+    uint64_t id;
+    uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+    entry.length = static_cast<uint32_t>(xid.length());
+    memcpy_s(entry.content, sizeof(entry.content),
+             xid.c_str(), xid.length());
+    entryLength -= (sizeof(entry.content) - entry.length);
+    location = write(&entry, entryLength);
+    try {
+        id = lsnToId(location);
+        std::auto_ptr<TPCTransaction> t(new TPCTransaction(id,
+                                                           shared_from_this(),
+                                                           xid));
+        boost::shared_ptr<TPCTransaction> t2(t);
+        boost::weak_ptr<Transaction> weak_t2(t2);
+        {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+            validIds[id] = weak_t2;
+        }
+        return t2;
+    }
+    catch(...) {
+        deleteTransaction(id);
+        throw;
+    }
+}
+
+void
+TransactionLog::recordPrepare(uint64_t transId)
+{
+    TransactionPrepare entry;
+    CLFS_LSN transLsn = idToLsn(transId);
+    write(&entry, sizeof(entry), &transLsn);
+}
+
+void
+TransactionLog::recordCommit(uint64_t transId)
+{
+    TransactionCommit entry;
+    CLFS_LSN transLsn = idToLsn(transId);
+    write(&entry, sizeof(entry), &transLsn);
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+        validIds[transId].reset();
+    }
+}
+
+void
+TransactionLog::recordAbort(uint64_t transId)
+{
+    TransactionAbort entry;
+    CLFS_LSN transLsn = idToLsn(transId);
+    write(&entry, sizeof(entry), &transLsn);
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+        validIds[transId].reset();
+    }
+}
+
+void
+TransactionLog::deleteTransaction(uint64_t transId)
+{
+    uint64_t newFirstId = 0;
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+        validIds.erase(transId);
+        // May have deleted the first entry; if so the log can release that.
+        // If this deletion results in an empty list of transactions,
+        // move the tail up to this transaction's LSN. This may result in
+        // one or more transactions being stranded in the log until there's
+        // more activity. If a restart happens while these unneeded log
+        // records are there, the presence of the TransactionDelete
+        // entry will cause them to be ignored anyway.
+        if (validIds.empty())
+            newFirstId = transId;
+        else if (validIds.begin()->first > transId)
+            newFirstId = validIds.begin()->first;
+    }
+    TransactionDelete deleteEntry;
+    CLFS_LSN transLsn = idToLsn(transId);
+    write(&deleteEntry, sizeof(deleteEntry), &transLsn);
+    if (newFirstId != 0)
+        moveTail(idToLsn(newFirstId));
+
+}
+
+void
+TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared_ptr>& preparedMap)
+{
+    // Go through all the known transactions; if the transaction is still
+    // valid (open or prepared) it will have weak_ptr to the Transaction.
+    // If it can be downcast and has a state of TRANS_PREPARED, add to the map.
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+    std::map<uint64_t, boost::weak_ptr<Transaction> >::const_iterator i;
+    for (i = validIds.begin(); i != validIds.end(); ++i) {
+        Transaction::shared_ptr t = i->second.lock();
+        if (t.get() == 0)
+            continue;
+        TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+        if (tpct.get() == 0)
+            continue;
+        if (tpct->state == Transaction::TRANS_PREPARED)
+            preparedMap[tpct->getXid()] = tpct;
+    }
+}
+
+void
+TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
+{
+    // Note that there may be transaction refs in the log which are deleted,
+    // so be sure to only add transactions at Start records, and ignore those
+    // that don't have an existing message record.
+    // Get the base LSN - that's how to say "start reading at the beginning"
+    CLFS_INFORMATION info;
+    ULONG infoLength = sizeof (info);
+    BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+    // Pointers for the various record types that can be assigned in the
+    // reading loop below.
+    TransactionStartDtx *startDtxEntry;
+    TransactionStartTx *startTxEntry;
+
+    PVOID recordPointer;
+    ULONG recordLength;
+    CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+    CLFS_LSN transLsn, current, undoNext;
+    PVOID readContext;
+    uint64_t transId;
+    // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+    // via a parameter.
+    current = info.BaseLsn;
+    ok = ::ReadLogRecord(marshal,
+                         &info.BaseLsn,
+                         ClfsContextForward,
+                         &recordPointer,
+                         &recordLength,
+                         &recordType,
+                         &undoNext,
+                         &transLsn,
+                         &readContext,
+                         0);
+
+    std::auto_ptr<Transaction> tPtr;
+    std::auto_ptr<TPCTransaction> tpcPtr;
+    while (ok) {
+        std::string xid;
+
+        // All the record types this class writes have a TransactionEntryType
+        // in the beginning. Based on that, do what's needed.
+        TransactionEntryType *t =
+            reinterpret_cast<TransactionEntryType *>(recordPointer);
+        switch(*t) {
+        case TransactionStartDtxEntry:
+            startDtxEntry =
+                reinterpret_cast<TransactionStartDtx *>(recordPointer);
+            transId = lsnToId(current);
+            QPID_LOG(debug, "Dtx start, id " << transId);
+            xid.assign(startDtxEntry->content, startDtxEntry->length);
+            tpcPtr.reset(new TPCTransaction(transId, shared_from_this(), xid));
+            transMap[transId] = boost::shared_ptr<TPCTransaction>(tpcPtr);
+            break;
+        case TransactionStartTxEntry:
+            startTxEntry =
+                reinterpret_cast<TransactionStartTx *>(recordPointer);
+            transId = lsnToId(current);
+            QPID_LOG(debug, "Tx start, id " << transId);
+            tPtr.reset(new Transaction(transId, shared_from_this()));
+            transMap[transId] = boost::shared_ptr<Transaction>(tPtr);
+            break;
+        case TransactionPrepareEntry:
+            transId = lsnToId(transLsn);
+            QPID_LOG(debug, "Dtx prepare, id " << transId);
+            if (transMap.find(transId) == transMap.end()) {
+                QPID_LOG(debug,
+                         "Dtx " << transId << " doesn't exist; discarded");
+            }
+            else {
+                transMap[transId]->state = Transaction::TRANS_PREPARED;
+            }
+            break;
+        case TransactionCommitEntry:
+            transId = lsnToId(transLsn);
+            QPID_LOG(debug, "Txn commit, id " << transId);
+            if (transMap.find(transId) == transMap.end()) {
+                QPID_LOG(debug,
+                         "Txn " << transId << " doesn't exist; discarded");
+            }
+            else {
+                transMap[transId]->state = Transaction::TRANS_COMMITTED;
+            }
+            break;
+        case TransactionAbortEntry:
+            transId = lsnToId(transLsn);
+            QPID_LOG(debug, "Txn abort, id " << transId);
+            if (transMap.find(transId) == transMap.end()) {
+                QPID_LOG(debug,
+                         "Txn " << transId << " doesn't exist; discarded");
+            }
+            else {
+                transMap[transId]->state = Transaction::TRANS_ABORTED;
+            }
+            break;
+        case TransactionDeleteEntry:
+            transId = lsnToId(transLsn);
+            QPID_LOG(debug, "Txn delete, id " << transId);
+            if (transMap.find(transId) == transMap.end()) {
+                QPID_LOG(debug,
+                         "Txn " << transId << " doesn't exist; discarded");
+            }
+            else {
+                transMap[transId]->state = Transaction::TRANS_DELETED;
+                transMap.erase(transId);
+            }
+            break;
+        default:
+            throw std::runtime_error("Bad transaction log entry type");
+        }
+
+        recordType = ClfsDataRecord;
+        ok = ::ReadNextLogRecord(readContext,
+                                 &recordPointer,
+                                 &recordLength,
+                                 &recordType,
+                                 0,             // No userLsn
+                                 &undoNext,
+                                 &transLsn,
+                                 &current,
+                                 0);
+    }
+    DWORD status = ::GetLastError();
+    ::TerminateReadLog(readContext);
+    if (status != ERROR_HANDLE_EOF)  // No more records
+        throw QPID_WINDOWS_ERROR(status);
+
+    // At this point we have a list of all the not-deleted transactions that
+    // were in existence when the broker last ran. All transactions of both
+    // Dtx and Tx types that haven't prepared or committed will be aborted.
+    // This will give the proper background against which to decide each
+    // message's disposition when recovering messages that were involved in
+    // transactions.
+    // In addition to recovering and aborting transactions, rebuild the
+    // validIds map now that we know which ids are really valid.
+    std::map<uint64_t, Transaction::shared_ptr>::const_iterator i;
+    for (i = transMap.begin(); i != transMap.end(); ++i) {
+        switch(i->second->state) {
+        case Transaction::TRANS_OPEN:
+            QPID_LOG(debug, "Txn " << i->first << " was open; aborted");
+            i->second->state = Transaction::TRANS_ABORTED;
+            break;
+        case Transaction::TRANS_ABORTED:
+            QPID_LOG(debug, "Txn " << i->first << " was aborted");
+            break;
+        case Transaction::TRANS_COMMITTED:
+            QPID_LOG(debug, "Txn " << i->first << " was committed");
+            break;
+        case Transaction::TRANS_PREPARED:
+            QPID_LOG(debug, "Txn " << i->first << " was prepared");
+            break;
+        case Transaction::TRANS_DELETED:
+            QPID_LOG(error,
+                     "Txn " << i->first << " was deleted; shouldn't be here");
+            break;
+        }
+        boost::weak_ptr<Transaction> weak_txn(i->second);
+        validIds[i->first] = weak_txn;
+    }
+}
+
+}}}  // namespace qpid::store::ms_clfs

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,98 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+#define QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <set>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/Mutex.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Transaction;
+class TPCTransaction;
+
+/**
+ * @class TransactionLog
+ *
+ * Represents a CLFS-housed transaction log.
+ */
+class TransactionLog : public Log,
+                       public boost::enable_shared_from_this<TransactionLog> {
+
+    // To know when it's ok to move the log tail the lowest valid Id must
+    // always be known. Keep track of valid Ids here. These are transactions
+    // which have not yet been Deleted in the log. They may be new, in progress,
+    // prepared, committed, or aborted - but not deleted.
+    // Entries corresponding to not-yet-finalized transactions (i.e., open or
+    // prepared) also have a weak_ptr so the Transaction can be accessed.
+    // This is primarily to check its state and get a list of prepared Xids.
+    std::map<uint64_t, boost::weak_ptr<Transaction> > validIds;
+    qpid::sys::Mutex idsLock;
+
+public:
+    // Inherited and reimplemented from Log. Figure the minimum marshalling
+    // buffer size needed for the records this class writes.
+    virtual uint32_t marshallingBufferSize();
+
+    typedef boost::shared_ptr<TransactionLog> shared_ptr;
+
+    // Get a new Transaction
+    boost::shared_ptr<Transaction> begin();
+
+    // Get a new TPCTransaction
+    boost::shared_ptr<TPCTransaction> begin(const std::string& xid);
+
+    void recordPrepare(uint64_t transId);
+    void recordCommit(uint64_t transId);
+    void recordAbort(uint64_t transId);
+    void deleteTransaction(uint64_t transId);
+
+    // Fill @arg preparedMap with Xid->TPCTransaction::shared_ptr for all
+    // currently prepared transactions.
+    void collectPreparedXids(std::map<std::string, boost::shared_ptr<TPCTransaction> >& preparedMap);
+
+    // Recover the transactions and their state from the log.
+    // Every non-deleted transaction recovered from the log will be
+    // represented in @arg transMap. The recovering messages can use this
+    // information to tell if a transaction referred to in an enqueue/dequeue
+    // operation should be recovered or dropped by examining transaction state.
+    //
+    // @param recoverer  Recovery manager used to recreate broker objects from
+    //                   entries recovered from the log.
+    // @param transMap   This method fills in the map of id -> shared_ptr of
+    //                   recovered transactions.
+    void recover(std::map<uint64_t, boost::shared_ptr<Transaction> >& transMap);
+};
+
+}}}  // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTIONLOG_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp?rev=1026175&r1=1026174&r2=1026175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp Thu Oct 21 23:09:00 2010
@@ -224,8 +224,7 @@ MessageMapRecordset::recover(MessageQueu
                        (LPVOID *)&piAdoRecordBinding);
     piAdoRecordBinding->BindToRecordset(&b);
     while (!rs->EndOfFile) {
-        qpid::store::QueueEntry entry;
-        entry.queueId = b.queueId;
+        qpid::store::QueueEntry entry(b.queueId);
         if (b.xidStatus == adFldOK && b.xidLength > 0) {
             entry.xid.assign(b.xid, b.xidLength);
             entry.tplStatus =



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org