You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/04/16 22:12:56 UTC

svn commit: r935068 - in /qpid/trunk/qpid/cpp/src/qpid/store: ./ ms-sql/

Author: shuston
Date: Fri Apr 16 20:12:55 2010
New Revision: 935068

URL: http://svn.apache.org/viewvc?rev=935068&view=rev
Log:
Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions.
The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code.

Added:
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h
Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Exception.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt Fri Apr 16 20:12:55 2010
@@ -71,7 +71,9 @@ if (BUILD_MSSQL)
                ms-sql/MessageMapRecordset.cpp
                ms-sql/MessageRecordset.cpp
                ms-sql/Recordset.cpp
+               ms-sql/SqlTransaction.cpp
                ms-sql/State.cpp
+               ms-sql/TplRecordset.cpp
                ms-sql/VariantHelper.cpp)
   target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
   install (TARGETS mssql_store # RUNTIME

Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Apr 16 20:12:55 2010
@@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::Reco
     QueueMap queues;
     MessageMap messages;
     MessageQueueMap messageQueueMap;
+    std::vector<std::string> xids;
+    PreparedTransactionMap dtxMap;
 
     provider->second->recoverConfigs(recoverer);
     provider->second->recoverExchanges(recoverer, exchanges);
     provider->second->recoverQueues(recoverer, queues);
     provider->second->recoverBindings(recoverer, exchanges, queues);
+    provider->second->recoverTransactions(recoverer, dtxMap);
     provider->second->recoverMessages(recoverer, messages, messageQueueMap);
     // Enqueue msgs where needed.
     for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
@@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::Reco
         broker::RecoverableMessage::shared_ptr msg = iMsg->second;
         // Now for each queue referenced in the queue map, locate it
         // and re-enqueue the message.
-        for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+        for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
              j != i->second.end();
              ++j) {
             // Locate the queue corresponding to the current queue Id
-            QueueMap::const_iterator iQ = queues.find(*j);
+            QueueMap::const_iterator iQ = queues.find(j->queueId);
             if (iQ == queues.end()) {
                 std::ostringstream oss;
                 oss << "No matching queue trying to re-enqueue message "
-                    << " on queue Id " << *j;
+                    << " on queue Id " << j->queueId;
                 THROW_STORE_EXCEPTION(oss.str());
             }
-            iQ->second->recover(msg);
+            // Messages involved in prepared transactions have their status
+            // updated accordingly. First, though, restore a message that
+            // is expected to be on a queue, including non-transacted
+            // messages and those pending dequeue in a dtx.
+            if (j->tplStatus != QueueEntry::ADDING)
+                iQ->second->recover(msg);
+            switch(j->tplStatus) {
+            case QueueEntry::ADDING:
+                dtxMap[j->xid]->enqueue(iQ->second, msg);
+                break;
+            case QueueEntry::REMOVING:
+                dtxMap[j->xid]->dequeue(iQ->second, msg);
+                break;
+            }
         }
     }
-
-    // recoverTransactions() and apply correctly while re-enqueuing
 }
 
 }} // namespace qpid::store

Modified: qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h Fri Apr 16 20:12:55 2010
@@ -44,8 +44,16 @@ typedef std::map<uint64_t, qpid::broker:
     QueueMap;
 typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr>
     MessageMap;
-// Msg Id -> vector of queue Ids where message is queued
-typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap;
+// Msg Id -> vector of queue entries where message is queued
+struct QueueEntry {
+    enum TplStatus { NONE = 0, ADDING = 1, REMOVING = 2 };
+    uint64_t queueId;
+    TplStatus tplStatus;
+    std::string xid;
+};
+typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap;
+typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr>
+    PreparedTransactionMap;
 
 class MessageStorePlugin;
 
@@ -316,6 +324,8 @@ public:
     virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
                                  MessageMap& messageMap,
                                  MessageQueueMap& messageQueueMap) = 0;
+    virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+                                     PreparedTransactionMap& dtxMap) = 0;
     //@}
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp Fri Apr 16 20:12:55 2010
@@ -26,51 +26,37 @@ namespace qpid {
 namespace store {
 namespace ms_sql {
 
-AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db)
-  : db(_db), transDepth(0)
+AmqpTransaction::AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+  : db(_db), sqlTrans(_db)
 {
 }
 
 AmqpTransaction::~AmqpTransaction()
 {
-    if (transDepth > 0)
-        this->abort();
 }
 
 void
-AmqpTransaction::begin()
+AmqpTransaction::sqlBegin()
 {
-    _bstr_t beginCmd("BEGIN TRANSACTION");
-    _ConnectionPtr c = *db;
-    c->Execute(beginCmd, NULL, adExecuteNoRecords);
-    ++transDepth;
+    sqlTrans.begin();
 }
 
 void
-AmqpTransaction::commit()
+AmqpTransaction::sqlCommit()
 {
-    if (transDepth > 0) {
-        _bstr_t commitCmd("COMMIT TRANSACTION");
-        _ConnectionPtr c = *db;
-        c->Execute(commitCmd, NULL, adExecuteNoRecords);
-        --transDepth;
-    }
+    sqlTrans.commit();
 }
 
 void
-AmqpTransaction::abort()
+AmqpTransaction::sqlAbort()
 {
-    if (transDepth > 0) {
-        _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
-        _ConnectionPtr c = *db;
-        c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
-        transDepth = 0;
-    }
+    sqlTrans.abort();
 }
 
-AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+
+AmqpTPCTransaction::AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
                                        const std::string& _xid)
-  : AmqpTransaction(_db), xid(_xid)
+  : AmqpTransaction(db), prepared(false), xid(_xid)
 {
 }
 
@@ -78,12 +64,4 @@ AmqpTPCTransaction::~AmqpTPCTransaction(
 {
 }
 
-void
-AmqpTPCTransaction::prepare()
-{
-    // Intermediate transactions should have already assured integrity of
-    // the content in the database; just waiting to pull the trigger on the
-    // outermost transaction.
-}
-
 }}}  // namespace qpid::store::ms_sql

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h Fri Apr 16 20:12:55 2010
@@ -23,8 +23,10 @@
  */
 
 #include <qpid/broker/TransactionalStore.h>
+#include <boost/shared_ptr.hpp>
 #include <string>
-#include <memory>
+
+#include "SqlTransaction.h"
 
 namespace qpid {
 namespace store {
@@ -41,23 +43,18 @@ class DatabaseConnection;
  */
 class AmqpTransaction : public qpid::broker::TransactionContext {
 
-    std::auto_ptr<DatabaseConnection> db;
-
-    // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
-    // et al, nested transactions are carried out with direct SQL commands.
-    // To ensure the state of this is known, keep track of how deeply the
-    // transactions are nested.
-    unsigned int transDepth;
+    boost::shared_ptr<DatabaseConnection> db;
+    SqlTransaction sqlTrans;
 
 public:
-    AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db);
+    AmqpTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
     virtual ~AmqpTransaction();
 
     DatabaseConnection *dbConn() { return db.get(); }
 
-    void begin();
-    void commit();
-    void abort();
+    void sqlBegin();
+    void sqlCommit();
+    void sqlAbort();
 };
 
 /**
@@ -69,14 +66,18 @@ public:
  */
 class AmqpTPCTransaction : public AmqpTransaction,
                            public qpid::broker::TPCTransactionContext {
+    bool prepared;
     std::string  xid;
 
 public:
-    AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+    AmqpTPCTransaction(const boost::shared_ptr<DatabaseConnection>& db,
                        const std::string& _xid);
     virtual ~AmqpTPCTransaction();
 
-    void prepare();
+    void setPrepared(void) { prepared = true; }
+    bool isPrepared(void) const { return prepared; }
+
+    const std::string& getXid(void) const { return xid; }
 };
 
 }}}  // namespace qpid::store::ms_sql

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp Fri Apr 16 20:12:55 2010
@@ -67,4 +67,25 @@ DatabaseConnection::close()
     conn = 0;
 }
 
+std::string
+DatabaseConnection::getErrors()
+{
+    long errCount = conn->Errors->Count;
+    if (errCount <= 0)
+        return "";
+    // Collection ranges from 0 to nCount -1.
+    std::ostringstream messages;
+    ErrorPtr pErr = NULL;
+    for (long i = 0 ; i < errCount ; i++ ) {
+        if (i > 0)
+            messages << "\n";
+        messages << "[" << i << "] ";
+        pErr = conn->Errors->GetItem(i);
+        messages << "Error " << pErr->Number << ": "
+                 << (LPCSTR)pErr->Description;
+    }
+    messages << std::ends;
+    return messages.str();
+}
+
 }}}  // namespace qpid::store::ms_sql

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h Fri Apr 16 20:12:55 2010
@@ -55,6 +55,8 @@ public:
     void beginTransaction() { conn->BeginTrans(); }
     void commitTransaction() {conn->CommitTrans(); }
     void rollbackTransaction() { conn->RollbackTrans(); }
+
+    std::string getErrors();
 };
 
 }}}  // namespace qpid::store::ms_sql

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Exception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Exception.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Exception.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Exception.h Fri Apr 16 20:12:55 2010
@@ -43,7 +43,9 @@ public:
 class ADOException : public Exception
 {
 public:
-    ADOException(const std::string& _text, _com_error &e)
+ ADOException(const std::string& _text,
+              _com_error &e,
+              const std::string& providerErrors = "")
       : Exception(_text) {
         text += ": ";
         text += e.ErrorMessage();
@@ -54,6 +56,8 @@ public:
             text += (const char *)wmsg;
             i->Release();
         }
+        if (providerErrors.length() > 0)
+            text += providerErrors;
     }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Fri Apr 16 20:12:55 2010
@@ -32,6 +32,7 @@
 #include "BindingRecordset.h"
 #include "MessageMapRecordset.h"
 #include "MessageRecordset.h"
+#include "TplRecordset.h"
 #include "DatabaseConnection.h"
 #include "Exception.h"
 #include "State.h"
@@ -51,6 +52,7 @@ const std::string TblExchange("tblExchan
 const std::string TblMessage("tblMessage");
 const std::string TblMessageMap("tblMessageMap");
 const std::string TblQueue("tblQueue");
+const std::string TblTpl("tblTPL");
 }
 
 namespace qpid {
@@ -256,9 +258,7 @@ public:
     virtual void prepare(qpid::broker::TPCTransactionContext& txn);
     virtual void commit(qpid::broker::TransactionContext& txn);
     virtual void abort(qpid::broker::TransactionContext& txn);
-
-    // @TODO This maybe should not be in TransactionalStore
-    virtual void collectPreparedXids(std::set<std::string>& xids) {}
+    virtual void collectPreparedXids(std::set<std::string>& xids);
     //@}
 
     virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
@@ -272,6 +272,8 @@ public:
     virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
                                  MessageMap& messageMap,
                                  MessageQueueMap& messageQueueMap);
+    virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+                                     PreparedTransactionMap& dtxMap);
 
 private:
     struct ProviderOptions : public qpid::Options
@@ -310,7 +312,7 @@ private:
 
     State *initState();
     DatabaseConnection *initConnection(void);
-    void createDb(_ConnectionPtr conn, const std::string &name);
+    void createDb(DatabaseConnection *db, const std::string &name);
 };
 
 static MSSqlProvider static_instance_registers_plugin;
@@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::T
                 // Database doesn't exist; create it
                 QPID_LOG(notice,
                          "MSSQL: Creating database " + options.catalogName);
-                createDb(conn, options.catalogName);
+                createDb(db.get(), options.catalogName);
             }
             else {
                 QPID_LOG(notice,
@@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& 
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error creating queue " + queue.getName(), e);
+        throw ADOException("Error creating queue " + queue.getName(), e, errs);
     }
 }
 
@@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& 
 void
 MSSqlProvider::destroy(PersistableQueue& queue)
 {
-    // MessageDeleter class for use with for_each, below.
-    class MessageDeleter {
-        BlobRecordset& msgs;
-    public:
-        explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
-        void operator()(uint64_t msgId) { msgs.remove(msgId); }
-    };
-
     DatabaseConnection *db = initConnection();
     BlobRecordset rsQueues;
     BindingRecordset rsBindings;
@@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue&
         // under the references in the bindings table. Then remove the
         // message->queue entries for the queue, also because the queue can't
         // be deleted while there are references to it. If there are messages
-        // orphaned by removing the queue references, those messages can
-        // also be deleted. Lastly, the queue record can be removed.
+        // orphaned by removing the queue references, they're deleted by
+        // a trigger on the tblMessageMap table. Lastly, the queue record
+        // can be removed.
         rsBindings.removeForQueue(queue.getPersistenceId());
-        std::vector<uint64_t> orphans;
-        rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
-        std::for_each(orphans.begin(), orphans.end(),
-                      MessageDeleter(rsMessages));
+        rsMessageMaps.removeForQueue(queue.getPersistenceId());
         rsQueues.remove(queue);
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error deleting queue " + queue.getName(), e);
+        throw ADOException("Error deleting queue " + queue.getName(), e, errs);
     }
 }
 
@@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableE
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error creating exchange " + exchange.getName(), e);
+        throw ADOException("Error creating exchange " + exchange.getName(),
+                           e,
+                           errs);
     }
 }
 
@@ -498,8 +495,11 @@ MSSqlProvider::destroy(const Persistable
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error deleting exchange " + exchange.getName(), e);
+        throw ADOException("Error deleting exchange " + exchange.getName(),
+                           e,
+                           errs);
     }
 }
 
@@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExc
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
         throw ADOException("Error binding exchange " + exchange.getName() +
-                           " to queue " + queue.getName(), e);
+                           " to queue " + queue.getName(),
+                           e,
+                           errs);
     }
 }
 
@@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableE
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
         throw ADOException("Error unbinding exchange " + exchange.getName() +
-                           " from queue " + queue.getName(), e);
+                           " from queue " + queue.getName(),
+                           e,
+                           errs);
     }
 }
 
@@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableC
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error creating config " + config.getName(), e);
+        throw ADOException("Error creating config " + config.getName(), e, errs);
     }
 }
 
@@ -592,8 +599,9 @@ MSSqlProvider::destroy(const Persistable
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error deleting config " + config.getName(), e);
+        throw ADOException("Error deleting config " + config.getName(), e, errs);
     }
 }
 
@@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrus
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error staging message", e);
+        throw ADOException("Error staging message", e, errs);
     }  
 }
 
@@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessag
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error deleting message", e);
+        throw ADOException("Error deleting message", e, errs);
     }
 }
 
@@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost
         db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         db->rollbackTransaction();
-        throw ADOException("Error appending to message", e);
+        throw ADOException("Error appending to message", e, errs);
     }  
 }
 
@@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::b
         rsMessages.loadContent(msg, data, offset, length);
     }
     catch(_com_error &e) {
-        throw ADOException("Error loading message content", e);
+        std::string errs = db->getErrors();
+        throw ADOException("Error loading message content", e, errs);
     }  
 }
 
@@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::Tra
     // this is not in the context of a transaction, then just use the thread's
     // DatabaseConnection with a ADO transaction.
     DatabaseConnection *db = 0;
+    std::string xid;
     AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
     if (atxn == 0) {
         db = initConnection();
@@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::Tra
     }
     else {
         (void)initState();     // Ensure this thread is initialized
+        // It's a transactional enqueue; if it's TPC, grab the xid.
+        AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+        if (tpcTxn)
+            xid = tpcTxn->getXid();
         db = atxn->dbConn();
         try {
-          atxn->begin();
+            atxn->sqlBegin();
         }
         catch(_com_error &e) {
-            throw ADOException("Error queuing message", e);
+            throw ADOException("Error queuing message", e, db->getErrors());
         }
     }
 
@@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::Tra
             rsMessages.add(msg);
         }
         rsMap.open(db, TblMessageMap);
-        rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
+        rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
         if (atxn)
-            atxn->commit();
+            atxn->sqlCommit();
         else
             db->commitTransaction();
     }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         if (atxn)
-            atxn->abort();
+            atxn->sqlAbort();
         else
             db->rollbackTransaction();
-        throw ADOException("Error queuing message", e);
+        throw ADOException("Error queuing message", e, errs);
     }
     msg->enqueueComplete();
 }
@@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::Tra
     // this is not in the context of a transaction, then just use the thread's
     // DatabaseConnection with a ADO transaction.
     DatabaseConnection *db = 0;
+    std::string xid;
     AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
     if (atxn == 0) {
         db = initConnection();
@@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::Tra
     }
     else {
         (void)initState();     // Ensure this thread is initialized
+        // It's a transactional dequeue; if it's TPC, grab the xid.
+        AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
+        if (tpcTxn)
+            xid = tpcTxn->getXid();
         db = atxn->dbConn();
         try {
-            atxn->begin();
+            atxn->sqlBegin();
         }
         catch(_com_error &e) {
-            throw ADOException("Error queuing message", e);
+            throw ADOException("Error queuing message", e, db->getErrors());
         }
     }
 
     MessageMapRecordset rsMap;
-    MessageRecordset rsMessages;
     try {
         rsMap.open(db, TblMessageMap);
-        bool more = rsMap.remove(msg->getPersistenceId(),
-                                 queue.getPersistenceId());
-        if (!more) {
-            rsMessages.open(db, TblMessage);
-            rsMessages.remove(msg);
+        // TPC dequeues are just marked pending and will actually be removed
+        // when the transaction commits; Single-phase dequeues are removed
+        // now, relying on the SQL transaction to put it back if the
+        // transaction doesn't commit.
+        if (!xid.empty()) {
+            rsMap.pendingRemove(msg->getPersistenceId(),
+                                queue.getPersistenceId(),
+                                xid);
+        }
+        else {
+            rsMap.remove(msg->getPersistenceId(),
+                         queue.getPersistenceId());
         }
         if (atxn)
-            atxn->commit();
+            atxn->sqlCommit();
         else
             db->commitTransaction();
     }
+    catch(ms_sql::Exception&) {
+        if (atxn)
+            atxn->sqlAbort();
+        else
+            db->rollbackTransaction();
+        throw;
+    }
     catch(_com_error &e) {
+        std::string errs = db->getErrors();
         if (atxn)
-            atxn->abort();
+            atxn->sqlAbort();
         else
             db->rollbackTransaction();
-        throw ADOException("Error dequeuing message", e);
+        throw ADOException("Error dequeuing message", e, errs);
     }  
     msg->dequeueComplete();
 }
@@ -827,10 +864,10 @@ MSSqlProvider::begin()
     // it safe and handle this just like a TPC transaction, which actually
     // can be prepared and committed/aborted from different threads,
     // making it a bad idea to try using the thread-local DatabaseConnection.
-    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
     db->open(options.connectString, options.catalogName);
     std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
-    tx->begin();
+    tx->sqlBegin();
     std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
     return tc;
 }
@@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransacti
 MSSqlProvider::begin(const std::string& xid)
 {
     (void)initState();     // Ensure this thread is initialized
-    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
     db->open(options.connectString, options.catalogName);
     std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
-    tx->begin();
+    tx->sqlBegin();
+
+    TplRecordset rsTpl;
+    try {
+        tx->sqlBegin();
+        rsTpl.open(db.get(), TblTpl);
+        rsTpl.add(xid);
+        tx->sqlCommit();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        tx->sqlAbort();
+        throw ADOException("Error adding TPL record", e, errs);
+    }  
+
     std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
     return tc;
 }
@@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& 
 void
 MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
 {
-    // The inner transactions used for the components of the TPC are done;
-    // nothing else to do but wait for the commit.
+    // Commit all the marked-up enqueue/dequeue ops and the TPL record.
+    // On commit/rollback the TPL will be removed and the TPL markups
+    // on the message map will be cleaned up as well.
+    (void)initState();     // Ensure this thread is initialized
+    AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+    if (atxn == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    try {
+        atxn->sqlCommit();
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
+    }  
+    atxn->setPrepared();
 }
 
 void
 MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
 {
     (void)initState();     // Ensure this thread is initialized
-    AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
-    if (atxn == 0)
-        throw qpid::broker::InvalidTransactionContextException();
-    atxn->commit();
+    /*
+     * One-phase transactions simply commit the outer SQL transaction
+     * that was begun on begin(). Two-phase transactions are different -
+     * the SQL transaction started on begin() was committed on prepare()
+     * so all the SQL records reflecting the enqueue/dequeue actions for
+     * the transaction are recorded but with xid markups on them to reflect
+     * that they are prepared but not committed. Now go back and remove
+     * the markups, deleting those marked for removal.
+     */
+    AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+    if (p2txn == 0) {
+        AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+        if (p1txn == 0)
+            throw qpid::broker::InvalidTransactionContextException();
+        p1txn->sqlCommit();
+        return;
+    }
+
+    DatabaseConnection *db(p2txn->dbConn());
+    TplRecordset rsTpl;
+    MessageMapRecordset rsMessageMap;
+    try {
+        db->beginTransaction();
+        rsTpl.open(db, TblTpl);
+        rsMessageMap.open(db, TblMessageMap);
+        rsMessageMap.commitPrepared(p2txn->getXid());
+        rsTpl.remove(p2txn->getXid());
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error committing transaction", e, errs);
+    }  
 }
 
 void
 MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
 {
     (void)initState();     // Ensure this thread is initialized
+    /*
+     * One-phase and non-prepared two-phase transactions simply abort
+     * the outer SQL transaction that was begun on begin(). However, prepared
+     * two-phase transactions are different - the SQL transaction started
+     * on begin() was committed on prepare() so all the SQL records
+     * reflecting the enqueue/dequeue actions for the transaction are
+     * recorded but with xid markups on them to reflect that they are
+     * prepared but not committed. Now go back and remove the markups,
+     * deleting those marked for addition.
+     */
+    AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
+    if (p2txn == 0 || !p2txn->isPrepared()) {
+        AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
+        if (p1txn == 0)
+            throw qpid::broker::InvalidTransactionContextException();
+        p1txn->sqlAbort();
+        return;
+    }
+
+    DatabaseConnection *db(p2txn->dbConn());
+    TplRecordset rsTpl;
+    MessageMapRecordset rsMessageMap;
+    try {
+        db->beginTransaction();
+        rsTpl.open(db, TblTpl);
+        rsMessageMap.open(db, TblMessageMap);
+        rsMessageMap.abortPrepared(p2txn->getXid());
+        rsTpl.remove(p2txn->getXid());
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error committing transaction", e, errs);
+    }  
+
+
+    (void)initState();     // Ensure this thread is initialized
     AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
     if (atxn == 0)
         throw qpid::broker::InvalidTransactionContextException();
-    atxn->abort();
+    atxn->sqlAbort();
+}
+
+void
+MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        TplRecordset rsTpl;
+        rsTpl.open(db, TblTpl);
+        rsTpl.recover(xids);
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error reading TPL", e, db->getErrors());
+    }  
 }
 
 // @TODO Much of this recovery code is way too similar... refactor to
@@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::bro
     rsMessageMaps.recover(messageQueueMap);
 }
 
+void
+MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+                                   PreparedTransactionMap& dtxMap)
+{
+    DatabaseConnection *db = initConnection();
+    std::set<std::string> xids;
+    try {
+        TplRecordset rsTpl;
+        rsTpl.open(db, TblTpl);
+        rsTpl.recover(xids);
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error recovering TPL records", e, db->getErrors());
+    }  
+
+    try {
+        // Rebuild the needed RecoverableTransactions.
+        for (std::set<std::string>::const_iterator iXid = xids.begin();
+             iXid != xids.end();
+             ++iXid) {
+            boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
+            dbX->open(options.connectString, options.catalogName);
+            std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
+                                                                        *iXid));
+            tx->setPrepared();
+            std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+            dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
+        }
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error recreating dtx connection", e);
+    }  
+}
+
 ////////////// Internal Methods
 
 State *
@@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void)
 }
 
 void
-MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
+MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
 {
     const std::string dbCmd = "CREATE DATABASE " + name;
     const std::string useCmd = "USE " + name;
@@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr c
         "  fieldTableBlob varbinary(MAX))";
     const std::string messageMapSpecs =
         " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
-        "  queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)";
+        "  queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+        "  prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
+        "    prepareStatus IN (1, 2)),"
+        "  xid varbinary(512) REFERENCES tblTPL(xid)"
+        "  CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
+    const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
     _variant_t unused;
     _bstr_t dbStr = dbCmd.c_str();
+    _ConnectionPtr conn(*db);
     try {
         conn->Execute(dbStr, &unused, adExecuteNoRecords);
         _bstr_t useStr = useCmd.c_str();
@@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr c
         makeTable = tableCmd + TblBinding + bindingSpecs;
         makeTableStr = makeTable.c_str();
         conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+        makeTable = tableCmd + TblTpl + tplSpecs;
+        makeTableStr = makeTable.c_str();
+        conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
         makeTable = tableCmd + TblMessageMap + messageMapSpecs;
         makeTableStr = makeTable.c_str();
         conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
     }
     catch(_com_error &e) {
-        throw ADOException("MSSQL can't create " + name, e);
+        throw ADOException("MSSQL can't create " + name, e, db->getErrors());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp Fri Apr 16 20:12:55 2010
@@ -24,96 +24,197 @@
 #include <qpid/store/StorageProvider.h>
 
 #include "MessageMapRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
 #include "VariantHelper.h"
 
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
 namespace qpid {
 namespace store {
 namespace ms_sql {
 
 void
-MessageMapRecordset::add(uint64_t messageId, uint64_t queueId)
+MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table)
 {
-    rs->AddNew();
-    rs->Fields->GetItem("messageId")->Value = messageId;
-    rs->Fields->GetItem("queueId")->Value = queueId;
-    rs->Update();
+    init(conn, table);
 }
 
-bool
+void
+MessageMapRecordset::add(uint64_t messageId,
+                         uint64_t queueId,
+                         const std::string& xid)
+{
+    std::ostringstream command;
+    command << "INSERT INTO " << tableName
+            << " (messageId, queueId";
+    if (!xid.empty())
+        command << ", prepareStatus, xid";
+    command << ") VALUES (" << messageId << "," << queueId;
+    if (!xid.empty())
+        command << "," << PREPARE_ADD << ",?";
+    command << ")" << std::ends;
+
+    _CommandPtr cmd = NULL;
+    _ParameterPtr xidVal = NULL;
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.str().c_str();
+    cmd->CommandType = adCmdText;
+    if (!xid.empty()) {
+        TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+        xidVal->Name = "@xid";
+        xidVal->Type = adVarBinary;
+        xidVal->Size = xid.length();
+        xidVal->Direction = adParamInput;
+        xidVal->Value = BlobEncoder(xid);
+        cmd->Parameters->Append(xidVal);
+    }
+    cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
 MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
 {
-    // Look up all mappings for the specified message. Then scan
-    // for the specified queue and keep track of whether or not the
-    // message exists on any queue we are not looking for a well.
-    std::ostringstream filter;
-    filter << "messageId = " << messageId << std::ends;
-    rs->PutFilter (VariantHelper<std::string>(filter.str()));
-    if (rs->RecordCount == 0)
-        return false;
+    std::ostringstream command;
+    command << "DELETE FROM " << tableName
+            << " WHERE queueId = " << queueId
+            << " AND messageId = " << messageId << std::ends;
+    _CommandPtr cmd = NULL;
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.str().c_str();
+    cmd->CommandType = adCmdText;
+    _variant_t deletedRecords;
+    cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+    if ((long)deletedRecords == 0)
+        throw ms_sql::Exception("Message does not exist in queue mapping");
+    // Trigger on deleting the mapping takes care of deleting orphaned
+    // message record from tblMessage.
+}
+
+void
+MessageMapRecordset::pendingRemove(uint64_t messageId,
+                                   uint64_t queueId,
+                                   const std::string& xid)
+{
+    // Look up the mapping for the specified message and queue. There
+    // should be only one because of the uniqueness constraint in the
+    // SQL table. Update it to reflect it's pending delete with
+    // the specified xid.
+    std::ostringstream command;
+    command << "UPDATE " << tableName
+            << " SET prepareStatus=" << PREPARE_REMOVE
+            << " , xid=?"
+            << " WHERE queueId = " << queueId
+            << " AND messageId = " << messageId << std::ends;
+
+    _CommandPtr cmd = NULL;
+    _ParameterPtr xidVal = NULL;
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.str().c_str();
+    cmd->CommandType = adCmdText;
+    xidVal->Name = "@xid";
+    xidVal->Type = adVarBinary;
+    xidVal->Size = xid.length();
+    xidVal->Direction = adParamInput;
+    xidVal->Value = BlobEncoder(xid);
+    cmd->Parameters->Append(xidVal);
+    cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::removeForQueue(uint64_t queueId)
+{
+    std::ostringstream command;
+    command << "DELETE FROM " << tableName
+            << " WHERE queueId = " << queueId << std::ends;
+    _CommandPtr cmd = NULL;
+
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.str().c_str();
+    cmd->CommandType = adCmdText;
+    cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+MessageMapRecordset::commitPrepared(const std::string& xid)
+{
+    // Find all the records for the specified xid. Records marked as adding
+    // are now permanent so remove the xid and prepareStatus. Records marked
+    // as removing are removed entirely.
+    openRs();
     MessageMap m;
     IADORecordBinding *piAdoRecordBinding;
     rs->QueryInterface(__uuidof(IADORecordBinding),
                        (LPVOID *)&piAdoRecordBinding);
     piAdoRecordBinding->BindToRecordset(&m);
-    bool moreEntries = false, deleted = false;
-    rs->MoveFirst();
-    // If the desired mapping gets deleted, and we already know there are
-    // other mappings for the message, don't bother finishing the scan.
-    while (!rs->EndOfFile && !(deleted && moreEntries)) {
-        if (m.queueId == queueId) {
+    for (; !rs->EndOfFile; rs->MoveNext()) {
+        if (m.xidStatus != adFldOK)
+            continue;
+        const std::string x(m.xid, m.xidLength);
+        if (x != xid)
+            continue;
+        if (m.prepareStatus == PREPARE_REMOVE) {
             rs->Delete(adAffectCurrent);
-            rs->Update();
-            deleted = true;
         }
         else {
-            moreEntries = true;
+            _variant_t dbNull;
+            dbNull.ChangeType(VT_NULL);
+            rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+            rs->Fields->GetItem("xid")->Value = dbNull;
         }
-        rs->MoveNext();
+        rs->Update();
     }
     piAdoRecordBinding->Release();
-    rs->Filter = "";
-    return moreEntries;
 }
 
 void
-MessageMapRecordset::removeForQueue(uint64_t queueId,
-                                    std::vector<uint64_t>& orphaned)
+MessageMapRecordset::abortPrepared(const std::string& xid)
 {
-    // Read all the messages queued on queueId and add them to the orphaned
-    // list. Then remove each one and learn if there are references to it
-    // from other queues. The ones without references are left in the
-    // orphaned list, others are removed.
-    std::ostringstream filter;
-    filter << "queueId = " << queueId << std::ends;
-    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    // Find all the records for the specified xid. Records marked as adding
+    // need to be removed while records marked as removing are put back to
+    // no xid and no prepareStatus.
+    openRs();
     MessageMap m;
     IADORecordBinding *piAdoRecordBinding;
     rs->QueryInterface(__uuidof(IADORecordBinding), 
                        (LPVOID *)&piAdoRecordBinding);
     piAdoRecordBinding->BindToRecordset(&m);
-    while (!rs->EndOfFile) {
-        orphaned.push_back(m.messageId);
-        rs->MoveNext();
+    for (; !rs->EndOfFile; rs->MoveNext()) {
+        if (m.xidStatus != adFldOK)
+            continue;
+        const std::string x(m.xid, m.xidLength);
+        if (x != xid)
+            continue;
+        if (m.prepareStatus == PREPARE_ADD) {
+            rs->Delete(adAffectCurrent);
+        }
+        else {
+            _variant_t dbNull;
+            dbNull.ChangeType(VT_NULL);
+            rs->Fields->GetItem("prepareStatus")->Value = dbNull;
+            rs->Fields->GetItem("xid")->Value = dbNull;
+        }
+        rs->Update();
     }
     piAdoRecordBinding->Release();
-    rs->Filter = "";     // Remove filter on queueId
-    rs->Requery(adOptionUnspecified);  // Get the entire map again
-
-    // Now delete all the messages on this queue; any message that still has
-    // references from other queue(s) is removed from orphaned.
-    for (std::vector<uint64_t>::iterator i = orphaned.begin();
-         i != orphaned.end();
-         ) {
-        if (remove(*i, queueId))
-            i = orphaned.erase(i);     // There are other refs to message *i
-        else
-            ++i;
-    }
 }
 
 void
 MessageMapRecordset::recover(MessageQueueMap& msgMap)
 {
+    openRs();
     if (rs->BOF && rs->EndOfFile)
         return;   // Nothing to do
     rs->MoveFirst();
@@ -123,7 +224,18 @@ MessageMapRecordset::recover(MessageQueu
                        (LPVOID *)&piAdoRecordBinding);
     piAdoRecordBinding->BindToRecordset(&b);
     while (!rs->EndOfFile) {
-        msgMap[b.messageId].push_back(b.queueId);
+        qpid::store::QueueEntry entry;
+        entry.queueId = b.queueId;
+        if (b.xidStatus == adFldOK && b.xidLength > 0) {
+            entry.xid.assign(b.xid, b.xidLength);
+            entry.tplStatus =
+                b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING
+                                               : QueueEntry::REMOVING;
+        }
+        else {
+            entry.tplStatus = QueueEntry::NONE;
+        }
+        msgMap[b.messageId].push_back(entry);
         rs->MoveNext();
     }
 
@@ -133,6 +245,7 @@ MessageMapRecordset::recover(MessageQueu
 void
 MessageMapRecordset::dump()
 {
+    openRs();
     Recordset::dump();
     if (rs->EndOfFile && rs->BOF)    // No records
         return;

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h Fri Apr 16 20:12:55 2010
@@ -23,7 +23,6 @@
  */
 
 #include <icrsint.h>
-#include <vector>
 #include "Recordset.h"
 #include <qpid/broker/RecoveryManager.h>
 
@@ -38,32 +37,56 @@ namespace ms_sql {
  */
 class MessageMapRecordset : public Recordset {
 
+    // These values are defined in a constraint on the tblMessageMap table.
+    // the prepareStatus column can only be null, 1, or 2.
+    enum { PREPARE_ADD=1, PREPARE_REMOVE=2 };
+
     class MessageMap : public CADORecordBinding {
         BEGIN_ADO_BINDING(MessageMap)
           ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
           ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
+          ADO_FIXED_LENGTH_ENTRY2(3, adTinyInt, prepareStatus, FALSE)
+          ADO_VARIABLE_LENGTH_ENTRY(4, adVarBinary, xid, sizeof(xid),
+                                    xidStatus, xidLength, FALSE)
         END_ADO_BINDING()
 
     public:
         uint64_t messageId;
         uint64_t queueId;
+        uint8_t  prepareStatus;
+        char     xid[512];
+        int      xidStatus;
+        uint32_t xidLength;
     };
 
+    void selectOnXid(const std::string& xid);
+
 public:
+    virtual void open(DatabaseConnection* conn, const std::string& table);
+
     // Add a new mapping
-    void add(uint64_t messageId, uint64_t queueId);
+    void add(uint64_t messageId,
+             uint64_t queueId,
+             const std::string& xid = "");
+
+    // Remove a specific mapping.
+    void remove(uint64_t messageId, uint64_t queueId);
+
+    // Mark the indicated message->queue entry pending removal. The entry
+    // for the mapping is updated to indicate pending removal with the
+    // specified xid.
+    void pendingRemove(uint64_t messageId,
+                       uint64_t queueId,
+                       const std::string& xid);
+
+    // Remove mappings for all messages on a specified queue.
+    void removeForQueue(uint64_t queueId);
+
+    // Commit records recorded as prepared.
+    void commitPrepared(const std::string& xid);
 
-    // Remove a specific mapping. Returns true if the message is still
-    // enqueued on at least one other queue; false if the message no longer
-    // exists on any other queues.
-    bool remove(uint64_t messageId, uint64_t queueId);
-
-    // Remove mappings for all messages on a specified queue. If there are
-    // messages that were only on the specified queue and are, therefore,
-    // orphaned now, return them in the orphaned vector. The orphaned
-    // messages can be deleted permanently as they are not referenced on
-    // any other queues.
-    void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned);
+    // Abort prepared changes.
+    void abortPrepared(const std::string& xid);
 
     // Recover the mappings of message ID -> vector<queue ID>.
     void recover(MessageQueueMap& msgMap);

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp Fri Apr 16 20:12:55 2010
@@ -35,63 +35,35 @@ namespace qpid {
 namespace store {
 namespace ms_sql {
 
-#if 0
-Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs)
-{
-    rs->MoveFirst();
-    setCurrent();
-}
-
-std::pair<uint64_t, BlobAdapter>&
-Recordset::Iterator::dereference() const
-{
-  return const_cast<std::pair<uint64_t, BlobAdapter> >(current);
-}
-
-void
-Recordset::Iterator::increment()
-{
-    rs->MoveNext();
-    setCurrent();
-}
-
-bool
-Recordset::Iterator::equal(const Iterator& x) const
-{
-    return current.first == x.current.first;
-}
 
 void
-Recordset::Iterator::setCurrent()
+Recordset::init(DatabaseConnection* conn, const std::string& table)
 {
-    if (!rs->EndOfFile) {
-        uint64_t id = rs->Fields->Item["persistenceId"]->Value;
-        long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
-        BlobAdapter blob(blobSize);
-        blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
-        current = std::make_pair(id, blob);
-    }
-    else {
-        current.first = 0;
-    }
+    dbConn = conn;
+    TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
+    tableName = table;
 }
-#endif
 
 void
-Recordset::open(DatabaseConnection* conn, const std::string& table)
+Recordset::openRs()
 {
-    _ConnectionPtr p = *conn;
-    TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
     // Client-side cursors needed to get access to newly added
     // identity column immediately. Recordsets need this to get the
     // persistence ID for the broker objects.
     rs->CursorLocation = adUseClient;
-    rs->Open(table.c_str(),
+    _ConnectionPtr p = *dbConn;
+    rs->Open(tableName.c_str(),
              _variant_t((IDispatch *)p, true), 
              adOpenStatic,
              adLockOptimistic,
              adCmdTable);
-    tableName = table;
+}
+
+void
+Recordset::open(DatabaseConnection* conn, const std::string& table)
+{
+    init(conn, table);
+    openRs();
 }
 
 void
@@ -99,7 +71,6 @@ Recordset::close()
 {
     if (rs && rs->State == adStateOpen)
         rs->Close();
-    rs = 0;    
 }
 
 void

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.h?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/Recordset.h Fri Apr 16 20:12:55 2010
@@ -51,46 +51,20 @@ protected:
     DatabaseConnection* dbConn;
     std::string tableName;
 
+    void init(DatabaseConnection* conn, const std::string& table);
+    void openRs();
+
 public:
+    Recordset() : rs(0), dbConn(0) {}
+    virtual ~Recordset() { close(); rs = 0; dbConn = 0; }
 
-#if 0
     /**
-     * Iterator support for walking through the recordset.
-     * If I need to try this again, I'd look at Recordset cloning.
+     * Default open() reads all records into the recordset.
      */
-    class Iterator : public boost::iterator_facade<
-      Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag>
-    {
-    public:
-        Iterator() : rs(0) { }
-        Iterator(Recordset& _rs);
-
-    private:
-        friend class boost::iterator_core_access;
-
-        std::pair<uint64_t, BlobAdapter>& dereference() const;
-        void increment();
-        bool equal(const Iterator& x) const;
-
-        _RecordsetPtr rs;
-        std::pair<uint64_t, BlobAdapter> current;
-
-        void setCurrent();
-    };
-
-    friend class Iterator;
-#endif
-
-    Recordset() : rs(0) {}
-    virtual ~Recordset() { close(); }
-    void open(DatabaseConnection* conn, const std::string& table);
+    virtual void open(DatabaseConnection* conn, const std::string& table);
     void close();
     void requery();
     operator _RecordsetPtr () { return rs; }
-#if 0
-    Iterator begin() { Iterator iter(*this); return iter; }
-    Iterator end() { Iterator iter; return iter; }
-#endif
 
     // Dump table contents; useful for debugging.
     void dump();

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp?rev=935068&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.cpp Fri Apr 16 20:12:55 2010
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SqlTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+SqlTransaction::SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db)
+  : db(_db), transDepth(0)
+{
+}
+
+SqlTransaction::~SqlTransaction()
+{
+    if (transDepth > 0)
+        this->abort();
+}
+
+void
+SqlTransaction::begin()
+{
+    _bstr_t beginCmd("BEGIN TRANSACTION");
+    _ConnectionPtr c = *db;
+    c->Execute(beginCmd, NULL, adExecuteNoRecords);
+    ++transDepth;
+}
+
+void
+SqlTransaction::commit()
+{
+    if (transDepth > 0) {
+        _bstr_t commitCmd("COMMIT TRANSACTION");
+        _ConnectionPtr c = *db;
+        c->Execute(commitCmd, NULL, adExecuteNoRecords);
+        --transDepth;
+    }
+}
+
+void
+SqlTransaction::abort()
+{
+    if (transDepth > 0) {
+        _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+        _ConnectionPtr c = *db;
+        c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+        transDepth = 0;
+    }
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h?rev=935068&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/SqlTransaction.h Fri Apr 16 20:12:55 2010
@@ -0,0 +1,67 @@
+#ifndef QPID_STORE_MSSQL_SQLTRANSACTION_H
+#define QPID_STORE_MSSQL_SQLTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class SqlTransaction
+ *
+ * Class representing an SQL transaction.
+ * Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+ * et al, nested transactions are carried out with direct SQL commands.
+ * To ensure the state of this is known, keep track of how deeply the
+ * transactions are nested. This is more of a safety/sanity check since
+ * AMQP doesn't provide nested transactions.
+ */
+class SqlTransaction {
+
+    boost::shared_ptr<DatabaseConnection> db;
+
+    // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+    // et al, nested transactions are carried out with direct SQL commands.
+    // To ensure the state of this is known, keep track of how deeply the
+    // transactions are nested.
+    unsigned int transDepth;
+
+public:
+    SqlTransaction(const boost::shared_ptr<DatabaseConnection>& _db);
+    ~SqlTransaction();
+
+    DatabaseConnection *dbConn() { return db.get(); }
+
+    void begin();
+    void commit();
+    void abort();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_SQLTRANSACTION_H */

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp?rev=935068&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.cpp Fri Apr 16 20:12:55 2010
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "TplRecordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+TplRecordset::open(DatabaseConnection* conn, const std::string& table)
+{
+    init(conn, table);
+    // Don't actually open until we know what to do. It's far easier and more
+    // efficient to simply do most of these TPL/xid ops in a single statement.
+}
+
+void
+TplRecordset::add(const std::string& xid)
+{
+    const std::string command =
+        "INSERT INTO " + tableName + " ( xid ) VALUES ( ? )";
+    _CommandPtr cmd = NULL;
+    _ParameterPtr xidVal = NULL;
+
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.c_str();
+    cmd->CommandType = adCmdText;
+    xidVal->Name = "@xid";
+    xidVal->Type = adVarBinary;
+    xidVal->Size = xid.length();
+    xidVal->Direction = adParamInput;
+    xidVal->Value = BlobEncoder(xid);
+    cmd->Parameters->Append(xidVal);
+    cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::remove(const std::string& xid)
+{
+    // Look up the item by its xid
+    const std::string command =
+        "DELETE FROM " + tableName + " WHERE xid = ?";
+    _CommandPtr cmd = NULL;
+    _ParameterPtr xidVal = NULL;
+
+    TESTHR(cmd.CreateInstance(__uuidof(Command)));
+    TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
+    _ConnectionPtr p = *dbConn;
+    cmd->ActiveConnection = p;
+    cmd->CommandText = command.c_str();
+    cmd->CommandType = adCmdText;
+    xidVal->Name = "@xid";
+    xidVal->Type = adVarBinary;
+    xidVal->Size = xid.length();
+    xidVal->Direction = adParamInput;
+    xidVal->Value = BlobEncoder(xid);
+    cmd->Parameters->Append(xidVal);
+    _variant_t deletedRecords;
+    cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
+}
+
+void
+TplRecordset::recover(std::set<std::string>& xids)
+{
+    openRs();
+    if (rs->BOF && rs->EndOfFile)
+        return;   // Nothing to do
+    rs->MoveFirst();
+    while (!rs->EndOfFile) {
+        _variant_t wxid = rs->Fields->Item["xid"]->Value;
+        char *xidBytes;
+        SafeArrayAccessData(wxid.parray, (void **)&xidBytes);
+        std::string xid(xidBytes, rs->Fields->Item["xid"]->ActualSize);
+        xids.insert(xid);
+        SafeArrayUnaccessData(wxid.parray);
+        rs->MoveNext();
+    }
+}
+
+void
+TplRecordset::dump()
+{
+    Recordset::dump();
+    if (rs->EndOfFile && rs->BOF)    // No records
+        return;
+
+    rs->MoveFirst();
+    while (!rs->EndOfFile) {
+        _bstr_t wxid = rs->Fields->Item["xid"]->Value;
+        QPID_LOG(notice, "  -> " << (const char *)wxid);
+        rs->MoveNext();
+    }
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h?rev=935068&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/TplRecordset.h Fri Apr 16 20:12:55 2010
@@ -0,0 +1,58 @@
+#ifndef QPID_STORE_MSSQL_TPLRECORDSET_H
+#define QPID_STORE_MSSQL_TPLRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class TplRecordset
+ *
+ * Class for the TPL (Transaction Prepared List) records.
+ */
+class TplRecordset : public Recordset {
+protected:
+
+public:
+    virtual void open(DatabaseConnection* conn, const std::string& table);
+
+    void add(const std::string& xid);
+
+    // Remove a record given its xid.
+    void remove(const std::string& xid);
+
+    // Recover prepared transaction XIDs.
+    void recover(std::set<std::string>& xids);
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_TPLRECORDSET_H */

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp?rev=935068&r1=935067&r2=935068&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp Fri Apr 16 20:12:55 2010
@@ -41,13 +41,25 @@ VariantHelper<Wrapped>::operator const _
 // Specialization for using _variant_t to wrap a std::string
 VariantHelper<std::string>::VariantHelper(const std::string &init)
 {
-    var.SetString(init.c_str());
+    if (init.empty() || init.length() == 0) {
+        var.vt = VT_BSTR;
+        var.bstrVal = NULL;
+    }
+    else {
+        var.SetString(init.c_str());
+    }
 }
 
 VariantHelper<std::string>&
 VariantHelper<std::string>::operator=(const std::string &rhs)
 {
-    var.SetString(rhs.c_str());
+    if (rhs.empty() || rhs.length() == 0) {
+        var.vt = VT_BSTR;
+        var.bstrVal = NULL;
+    }
+    else {
+        var.SetString(rhs.c_str());
+    }
     return *this;
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org