You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/28 16:36:40 UTC

svn commit: r830613 [2/2] - in /qpid/branches/0.5.x-dev/qpid/cpp/src/qpid: broker/ store/ store/ms-sql/

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,989 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StorageProvider.h>
+#include "AmqpTransaction.h"
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "MessageMapRecordset.h"
+#include "MessageRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+        no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblMessage("tblMessage");
+const std::string TblMessageMap("tblMessageMap");
+const std::string TblQueue("tblQueue");
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MSSqlProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as
+ * the backend data store for Qpid.
+ */
+class MSSqlProvider : public qpid::store::StorageProvider
+{
+protected:
+    void finalizeMe();
+
+    void dump();
+
+public:
+    MSSqlProvider();
+    ~MSSqlProvider();
+
+    virtual qpid::Options* getOptions() { return &options; }
+
+    virtual void earlyInitialize (Plugin::Target& target);
+    virtual void initialize(Plugin::Target& target);
+
+    /**
+     * Receive notification that this provider is the one that will actively
+     * handle provider storage for the target. If the provider is to be used,
+     * this method will be called after earlyInitialize() and before any
+     * recovery operations (recovery, in turn, precedes call to initialize()).
+     */
+    virtual void activate(MessageStorePlugin &store);
+
+    /**
+     * @name Methods inherited from qpid::broker::MessageStore
+     */
+    //@{
+    /**
+     * If called after init() but before recovery, will discard the database
+     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+     * is true, the content of the store dir will be moved to a backup dir
+     * inside the store dir. This is used when cluster nodes recover and must
+     * get thier content from a cluster sync rather than directly fromt the
+     * store.
+     *
+     * @param pushDownStoreFiles If true, will move content of the store dir
+     *                           into a subdir, leaving the store dir
+     *                           otherwise empty.
+     */
+    virtual void truncateInit(const bool pushDownStoreFiles = false);
+
+    /**
+     * Record the existence of a durable queue
+     */
+    virtual void create(PersistableQueue& queue,
+                        const qpid::framing::FieldTable& args);
+    /**
+     * Destroy a durable queue
+     */
+    virtual void destroy(PersistableQueue& queue);
+
+    /**
+     * Record the existence of a durable exchange
+     */
+    virtual void create(const PersistableExchange& exchange,
+                        const qpid::framing::FieldTable& args);
+    /**
+     * Destroy a durable exchange
+     */
+    virtual void destroy(const PersistableExchange& exchange);
+
+    /**
+     * Record a binding
+     */
+    virtual void bind(const PersistableExchange& exchange,
+                      const PersistableQueue& queue,
+                      const std::string& key,
+                      const qpid::framing::FieldTable& args);
+
+    /**
+     * Forget a binding
+     */
+    virtual void unbind(const PersistableExchange& exchange,
+                        const PersistableQueue& queue,
+                        const std::string& key,
+                        const qpid::framing::FieldTable& args);
+
+    /**
+     * Record generic durable configuration
+     */
+    virtual void create(const PersistableConfig& config);
+
+    /**
+     * Destroy generic durable configuration
+     */
+    virtual void destroy(const PersistableConfig& config);
+
+    /**
+     * Stores a messages before it has been enqueued
+     * (enqueueing automatically stores the message so this is
+     * only required if storage is required prior to that
+     * point). If the message has not yet been stored it will
+     * store the headers as well as any content passed in. A
+     * persistence id will be set on the message which can be
+     * used to load the content or to append to it.
+     */
+    virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+    /**
+     * Destroys a previously staged message. This only needs
+     * to be called if the message is never enqueued. (Once
+     * enqueued, deletion will be automatic when the message
+     * is dequeued from all queues it was enqueued onto).
+     */
+    virtual void destroy(PersistableMessage& msg);
+
+    /**
+     * Appends content to a previously staged message
+     */
+    virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+                               const std::string& data);
+
+    /**
+     * Loads (a section) of content data for the specified
+     * message (previously stored through a call to stage or
+     * enqueue) into data. The offset refers to the content
+     * only (i.e. an offset of 0 implies that the start of the
+     * content should be loaded, not the headers or related
+     * meta-data).
+     */
+    virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+                             const boost::intrusive_ptr<const PersistableMessage>& msg,
+                             std::string& data,
+                             uint64_t offset,
+                             uint32_t length);
+
+    /**
+     * Enqueues a message, storing the message if it has not
+     * been previously stored and recording that the given
+     * message is on the given queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to enqueue
+     * @param queue the name of the queue onto which it is to be enqueued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
+
+    /**
+     * Dequeues a message, recording that the given message is
+     * no longer on the given queue and deleting the message
+     * if it is no longer on any other queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to dequeue
+     * @param queue the name of the queue from which it is to be dequeued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
+
+    /**
+     * Flushes all async messages to disk for the specified queue
+     *
+     * Note: this is a no-op for this provider.
+     *
+     * @param queue the name of the queue from which it is to be dequeued
+     */
+    virtual void flush(const PersistableQueue& queue) {};
+
+    /**
+     * Returns the number of outstanding AIO's for a given queue
+     *
+     * If 0, than all the enqueue / dequeues have been stored
+     * to disk
+     *
+     * @param queue the name of the queue to check for outstanding AIO
+     */
+    virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+        {return 0;}
+    //@}
+
+    /**
+     * @name Methods inherited from qpid::broker::TransactionalStore
+     */
+    //@{
+    virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+    virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+    virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+    virtual void commit(qpid::broker::TransactionContext& txn);
+    virtual void abort(qpid::broker::TransactionContext& txn);
+
+    // @TODO This maybe should not be in TransactionalStore
+    virtual void collectPreparedXids(std::set<std::string>& xids) {}
+    //@}
+
+    virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+    virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+                                  ExchangeMap& exchangeMap);
+    virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+                               QueueMap& queueMap);
+    virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+                                 const ExchangeMap& exchangeMap);
+    virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+                                 MessageMap& messageMap,
+                                 MessageQueueMap& messageQueueMap);
+
+private:
+    struct ProviderOptions : public qpid::Options
+    {
+        std::string connectString;
+        std::string catalogName;
+
+        ProviderOptions(const std::string &name)
+            : qpid::Options(name),
+              catalogName("QpidStore")
+        {
+            const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+            TCHAR myName[NAMELEN];
+            DWORD myNameLen = NAMELEN;
+            GetComputerName(myName, &myNameLen);
+            connectString = "Data Source=";
+            connectString += myName;
+            connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+            addOptions()
+                ("connect",
+                 qpid::optValue(connectString, "STRING"),
+                 "Connection string for the database to use. Will prepend "
+                 "Provider=SQLOLEDB;")
+                ("catalog",
+                 qpid::optValue(catalogName, "DB NAME"),
+                 "Catalog (database) name")
+                ;
+        }
+    };
+    ProviderOptions options;
+
+    // Each thread has a separate connection to the database and also needs
+    // to manage its COM initialize/finalize individually. This is done by
+    // keeping a thread-specific State.
+    boost::thread_specific_ptr<State> dbState;
+
+    State *initState();
+    DatabaseConnection *initConnection(void);
+    void createDb(_ConnectionPtr conn, const std::string &name);
+};
+
+static MSSqlProvider static_instance_registers_plugin;
+
+void
+MSSqlProvider::finalizeMe()
+{
+    dbState.reset();
+}
+
+MSSqlProvider::MSSqlProvider()
+    : options("MS SQL Provider options")
+{
+}
+
+MSSqlProvider::~MSSqlProvider()
+{
+}
+
+void
+MSSqlProvider::earlyInitialize(Plugin::Target &target)
+{
+    MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+    if (store) {
+        // If the database init fails, report it and don't register; give
+        // the rest of the broker a chance to run.
+        //
+        // Don't try to initConnection() since that will fail if the
+        // database doesn't exist. Instead, try to open a connection without
+        // a database name, then search for the database. There's still a
+        // chance this provider won't be selected for the store too, so be
+        // be sure to close the database connection before return to avoid
+        // leaving a connection up that will not be used.
+        try {
+            std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+            db->open(options.connectString, "");
+            _ConnectionPtr conn(*db);
+            _RecordsetPtr pCatalogs = NULL;
+            VariantHelper<std::string> catalogName(options.catalogName);
+            pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+            if (pCatalogs->EndOfFile) {
+                // Database doesn't exist; create it
+                QPID_LOG(notice,
+                         "MSSQL: Creating database " + options.catalogName);
+                createDb(conn, options.catalogName);
+            }
+            else {
+                QPID_LOG(notice,
+                         "MSSQL: Database located: " + options.catalogName);
+            }
+            if (pCatalogs) {
+                if (pCatalogs->State == adStateOpen)
+                    pCatalogs->Close();
+                pCatalogs = 0;
+            }
+            db->close();
+            store->providerAvailable("MSSQL", this);
+        }
+        catch (qpid::Exception &e) {
+            QPID_LOG(error, e.what());
+            return;
+        }
+        store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this));
+    }
+}
+
+void
+MSSqlProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlProvider::activate(MessageStorePlugin &store)
+{
+  QPID_LOG(info, "MS SQL Provider is up");
+}
+
+void
+MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
+{
+}
+
+void
+MSSqlProvider::create(PersistableQueue& queue,
+                      const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsQueues;
+        db->beginTransaction();
+        rsQueues.open(db, TblQueue);
+        rsQueues.add(queue);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error creating queue " + queue.getName(), e);
+    }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlProvider::destroy(PersistableQueue& queue)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsQueues;
+        BindingRecordset rsBindings;
+        db->beginTransaction();
+        rsQueues.open(db, TblQueue);
+        rsBindings.open(db, TblBinding);
+        rsQueues.remove(queue);
+        rsBindings.remove(queue.getName());
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error deleting queue " + queue.getName(), e);
+    }
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlProvider::create(const PersistableExchange& exchange,
+                      const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsExchanges;
+        db->beginTransaction();
+        rsExchanges.open(db, TblExchange);
+        rsExchanges.add(exchange);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error creating exchange " + exchange.getName(), e);
+    }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlProvider::destroy(const PersistableExchange& exchange)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsExchanges;
+        BindingRecordset rsBindings;
+        db->beginTransaction();
+        rsExchanges.open(db, TblExchange);
+        rsBindings.open(db, TblBinding);
+        rsExchanges.remove(exchange);
+        rsBindings.remove(exchange.getPersistenceId());
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error deleting exchange " + exchange.getName(), e);
+    }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlProvider::bind(const PersistableExchange& exchange,
+                    const PersistableQueue& queue,
+                    const std::string& key,
+                    const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BindingRecordset rsBindings;
+        db->beginTransaction();
+        rsBindings.open(db, TblBinding);
+        rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error binding exchange " + exchange.getName() +
+                           " to queue " + queue.getName(), e);
+    }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlProvider::unbind(const PersistableExchange& exchange,
+                      const PersistableQueue& queue,
+                      const std::string& key,
+                      const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BindingRecordset rsBindings;
+        db->beginTransaction();
+        rsBindings.open(db, TblBinding);
+        rsBindings.remove(exchange.getPersistenceId(),
+                          queue.getName(),
+                          key,
+                          args);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error unbinding exchange " + exchange.getName() +
+                           " from queue " + queue.getName(), e);
+    }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlProvider::create(const PersistableConfig& config)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsConfigs;
+        db->beginTransaction();
+        rsConfigs.open(db, TblConfig);
+        rsConfigs.add(config);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error creating config " + config.getName(), e);
+    }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlProvider::destroy(const PersistableConfig& config)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsConfigs;
+        db->beginTransaction();
+        rsConfigs.open(db, TblConfig);
+        rsConfigs.remove(config);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error deleting config " + config.getName(), e);
+    }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        MessageRecordset rsMessages;
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.add(msg);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error staging message", e);
+    }  
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlProvider::destroy(PersistableMessage& msg)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        BlobRecordset rsMessages;
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.remove(msg);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error deleting message", e);
+    }
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+                             const std::string& data)
+{
+    DatabaseConnection *db = initConnection();
+    try {
+        MessageRecordset rsMessages;
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.append(msg, data);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        db->rollbackTransaction();
+        throw ADOException("Error appending to message", e);
+    }  
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+                           const boost::intrusive_ptr<const PersistableMessage>& msg,
+                           std::string& data,
+                           uint64_t offset,
+                           uint32_t length)
+{
+    // SQL store keeps all messages in one table, so we don't need the
+    // queue reference.
+    DatabaseConnection *db = initConnection();
+    try {
+        MessageRecordset rsMessages;
+        rsMessages.open(db, TblMessage);
+        rsMessages.loadContent(msg, data, offset, length);
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error loading message content", e);
+    }  
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+                       const boost::intrusive_ptr<PersistableMessage>& msg,
+                       const PersistableQueue& queue)
+{
+    AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+    if (atxn == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    (void)initState();     // Ensure this thread is initialized
+    try {
+        atxn->begin();
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error queuing message", e);
+    }  
+
+    try {
+        if (msg->getPersistenceId() == 0) {    // Message itself not yet saved
+            MessageRecordset rsMessages;
+            rsMessages.open(atxn->dbConn(), TblMessage);
+            rsMessages.add(msg);
+        }
+        MessageMapRecordset rsMap;
+        rsMap.open(atxn->dbConn(), TblMessageMap);
+        rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
+        atxn->commit();
+    }
+    catch(_com_error &e) {
+        atxn->abort();
+        throw ADOException("Error queuing message", e);
+    }  
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+                       const boost::intrusive_ptr<PersistableMessage>& msg,
+                       const PersistableQueue& queue)
+{
+    AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
+    if (atxn == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    (void)initState();     // Ensure this thread is initialized
+    try {
+        atxn->begin();
+    }
+    catch(_com_error &e) {
+        throw ADOException("Error queuing message", e);
+    }  
+    try {
+        MessageMapRecordset rsMap;
+        rsMap.open(atxn->dbConn(), TblMessageMap);
+        bool more = rsMap.remove(msg->getPersistenceId(),
+                                 queue.getPersistenceId());
+        if (!more) {
+            MessageRecordset rsMessages;
+            rsMessages.open(atxn->dbConn(), TblMessage);
+            rsMessages.remove(msg);
+        }
+        atxn->commit();
+    }
+    catch(_com_error &e) {
+        atxn->abort();
+        throw ADOException("Error dequeuing message", e);
+    }  
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlProvider::begin()
+{
+    (void)initState();     // Ensure this thread is initialized
+
+    // Transactions are associated with the Connection, so this transaction
+    // context needs its own connection. At the time of writing, single-phase
+    // transactions are dealt with completely on one thread, so we really
+    // could just use the thread-specific DatabaseConnection for this.
+    // However, that would introduce an ugly, hidden coupling, so play
+    // it safe and handle this just like a TPC transaction, which actually
+    // can be prepared and committed/aborted from different threads,
+    // making it a bad idea to try using the thread-local DatabaseConnection.
+    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    db->open(options.connectString, options.catalogName);
+    std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
+    tx->begin();
+    std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
+    return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlProvider::begin(const std::string& xid)
+{
+    (void)initState();     // Ensure this thread is initialized
+    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    db->open(options.connectString, options.catalogName);
+    std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
+    tx->begin();
+    std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
+    return tc;
+}
+
+void
+MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+}
+
+void
+MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
+{
+    (void)initState();     // Ensure this thread is initialized
+    AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+    if (atxn == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    atxn->commit();
+}
+
+void
+MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
+{
+    (void)initState();     // Ensure this thread is initialized
+    AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
+    if (atxn == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    atxn->abort();
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
+    rsConfigs.open(db, TblConfig);
+    _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Config instance and reset its ID.
+        broker::RecoverableConfig::shared_ptr config =
+            recoverer.recoverConfig(blob);
+        config->setPersistenceId(id);
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+                                ExchangeMap& exchangeMap)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
+    rsExchanges.open(db, TblExchange);
+    _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Exchange instance, reset its ID, and remember the
+        // ones restored for matching up when recovering bindings.
+        broker::RecoverableExchange::shared_ptr exchange =
+            recoverer.recoverExchange(blob);
+        exchange->setPersistenceId(id);
+        exchangeMap[id] = exchange;
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+                             QueueMap& queueMap)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    rsQueues.open(db, TblQueue);
+    _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Queue instance and reset its ID.
+        broker::RecoverableQueue::shared_ptr queue =
+            recoverer.recoverQueue(blob);
+        queue->setPersistenceId(id);
+        queueMap[id] = queue;
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+                               const ExchangeMap& exchangeMap)
+{
+    DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
+    rsBindings.open(db, TblBinding);
+    rsBindings.recover(recoverer, exchangeMap);
+}
+
+void
+MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+                               MessageMap& messageMap,
+                               MessageQueueMap& messageQueueMap)
+{
+    DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
+    rsMessages.open(db, TblMessage);
+    rsMessages.recover(recoverer, messageMap);
+
+    MessageMapRecordset rsMessageMaps;
+    rsMessageMaps.open(db, TblMessageMap);
+    rsMessageMaps.recover(messageQueueMap);
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlProvider::initState()
+{
+    State *state = dbState.get();   // See if thread has initialized
+    if (!state) {
+        state = new State;
+        dbState.reset(state);
+    }
+    return state;
+}
+  
+DatabaseConnection *
+MSSqlProvider::initConnection(void)
+{
+    State *state = initState();
+    if (state->dbConn != 0)
+        return state->dbConn;    // And the DatabaseConnection is set up too
+    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    db->open(options.connectString, options.catalogName);
+    state->dbConn = db.release();
+    return state->dbConn;
+}
+
+void
+MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
+{
+    const std::string dbCmd = "CREATE DATABASE " + name;
+    const std::string useCmd = "USE " + name;
+    const std::string tableCmd = "CREATE TABLE ";
+    const std::string colSpecs =
+        " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+        "  fieldTableBlob varbinary(MAX) NOT NULL)";
+    const std::string bindingSpecs =
+        " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+        "  queueName varchar(255) NOT NULL,"
+        "  routingKey varchar(255),"
+        "  fieldTableBlob varbinary(MAX))";
+    const std::string messageMapSpecs =
+        " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
+        "  queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)";
+    _variant_t unused;
+     _bstr_t dbStr = dbCmd.c_str();
+    conn->Execute(dbStr, &unused, adExecuteNoRecords);
+    _bstr_t useStr = useCmd.c_str();
+    conn->Execute(useStr, &unused, adExecuteNoRecords);
+    std::string makeTable = tableCmd + TblQueue + colSpecs;
+    _bstr_t makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    makeTable = tableCmd + TblExchange + colSpecs;
+    makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    makeTable = tableCmd + TblConfig + colSpecs;
+    makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    makeTable = tableCmd + TblMessage + colSpecs;
+    makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    makeTable = tableCmd + TblBinding + bindingSpecs;
+    makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    makeTable = tableCmd + TblMessageMap + messageMapSpecs;
+    makeTableStr = makeTable.c_str();
+    conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+}
+
+void
+MSSqlProvider::dump()
+{
+  // dump all db records to qpid_log
+  QPID_LOG(notice, "DB Dump: (not dumping anything)");
+  //  rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "MessageMapRecordset.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageMapRecordset::add(uint64_t messageId, uint64_t queueId)
+{
+    rs->AddNew();
+    rs->Fields->GetItem("messageId")->Value = messageId;
+    rs->Fields->GetItem("queueId")->Value = queueId;
+    rs->Update();
+}
+
+bool
+MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
+{
+    // Look up all mappings for the specified message. Then scan
+    // for the specified queue and keep track of whether or not the
+    // message exists on any queue we are not looking for a well.
+    std::ostringstream filter;
+    filter << "messageId = " << messageId << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    MessageMap m;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&m);
+    bool moreEntries = false, deleted = false;
+    // If the desired mapping gets deleted, and we already know there are
+    // other mappings for the message, don't bother finishing the scan.
+    while (!rs->EndOfFile && !(deleted && moreEntries)) {
+        if (m.queueId == queueId) {
+            rs->Delete(adAffectCurrent);
+            rs->Update();
+            deleted = true;
+        }
+        else {
+            moreEntries = true;
+        }
+        rs->MoveNext();
+    }
+    piAdoRecordBinding->Release();
+    return moreEntries;
+}
+
+void
+MessageMapRecordset::recover(MessageQueueMap& msgMap)
+{
+    if (rs->BOF && rs->EndOfFile)
+        return;   // Nothing to do
+    rs->MoveFirst();
+    MessageMap b;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&b);
+    while (!rs->EndOfFile) {
+        msgMap[b.messageId].push_back(b.queueId);
+        rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+void
+MessageMapRecordset::dump()
+{
+    Recordset::dump();
+    if (rs->EndOfFile && rs->BOF)    // No records
+        return;
+    rs->MoveFirst();
+
+    MessageMap m;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&m);
+   
+    while (!rs->EndOfFile) {
+        QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId);
+        rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,69 @@
+#ifndef QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageMapRecordset
+ *
+ * Class for the message map (message -> queue) records.
+ */
+class MessageMapRecordset : public Recordset {
+
+    class MessageMap : public CADORecordBinding {
+        BEGIN_ADO_BINDING(MessageMap)
+          ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+          ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE)
+        END_ADO_BINDING()
+
+    public:
+        uint64_t messageId;
+        uint64_t queueId;
+    };
+
+public:
+    // Add a new mapping
+    void add(uint64_t messageId, uint64_t queueId);
+
+    // Remove a specific mapping. Returns true if the message is still
+    // enqueued on at least one other queue; false if the message no longer
+    // exists on any other queues.
+    bool remove(uint64_t messageId, uint64_t queueId);
+
+    // Recover the mappings of message ID -> vector<queue ID>.
+    void recover(MessageQueueMap& msgMap);
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGEMAPRECORDSET_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "MessageRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+class qpid::broker::PersistableMessage;
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+    BlobEncoder blob (msg);   // Marshall headers and content to a blob
+    rs->AddNew();
+    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+    rs->Update();
+}
+
+void
+MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                         const std::string& data)
+{
+    // Look up the message by its Id
+    std::ostringstream filter;
+    filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount == 0) {
+        throw Exception("Can't append to message not stored in database");
+    }
+    BlobEncoder blob (data);   // Marshall string data to a blob
+    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+    rs->Update();
+}
+
+void
+MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg)
+{
+    BlobRecordset::remove(msg->getPersistenceId());
+}
+
+void
+MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                              std::string& data,
+                              uint64_t offset,
+                              uint32_t length)
+{
+    // Look up the message by its Id
+    std::ostringstream filter;
+    filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount == 0) {
+        throw Exception("Can't load message not stored in database");
+    }
+
+    // NOTE! If this code needs to change, please verify the encoding
+    // code in BlobEncoder.
+    long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+    uint32_t headerSize;
+    const size_t headerFieldLength = sizeof(headerSize);
+    BlobAdapter blob(headerFieldLength);
+    blob =
+        rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+    headerSize = ((qpid::framing::Buffer&)blob).getLong();
+
+    // GetChunk always begins reading where the previous GetChunk left off,
+    // so we can't just tell it to ignore the header and read the data.
+    // So, read the header plus the offset, plus the desired data, then
+    // copy the desired data to the supplied string. If this ends up asking
+    // for more than is available in the field, reduce it to what's there.
+    long getSize = headerSize + offset + length;
+    if (getSize + (long)headerFieldLength > blobSize) {
+        size_t reduce = (getSize + headerFieldLength) - blobSize;
+        getSize -= reduce;
+        length -= reduce;
+    }
+    BlobAdapter header_plus(getSize);
+    header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize);
+    uint8_t *throw_away = new uint8_t[headerSize + offset];
+    ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset);
+    delete throw_away;
+    ((qpid::framing::Buffer&)header_plus).getRawData(data, length);
+}
+
+void
+MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
+                          std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap)
+{
+    if (rs->BOF && rs->EndOfFile)
+        return;   // Nothing to do
+    rs->MoveFirst();
+    Binding b;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&b);
+    while (!rs->EndOfFile) {
+        // The blob was written as normal, but with the header length
+        // prepended in a uint32_t. Due to message staging threshold
+        // limits, the header may be all that's read in; get it first,
+        // recover that message header, then see if the rest is needed.
+        //
+        // NOTE! If this code needs to change, please verify the encoding
+        // code in BlobEncoder.
+        long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+        uint32_t headerSize;
+        const size_t headerFieldLength = sizeof(headerSize);
+        BlobAdapter blob(headerFieldLength);
+        blob =
+          rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
+        headerSize = ((qpid::framing::Buffer&)blob).getLong();
+        BlobAdapter header(headerSize);
+        header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize);
+        broker::RecoverableMessage::shared_ptr msg;
+        msg = recoverer.recoverMessage(header);
+        msg->setPersistenceId(b.messageId);
+        messageMap[b.messageId] = msg;
+
+        // Now, do we need the rest of the content?
+        long contentLength = blobSize - headerFieldLength - headerSize;
+        if (msg->loadContent(contentLength)) {
+            BlobAdapter content(contentLength);
+             content =
+                rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);
+            msg->decodeContent(content);
+        }
+        rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+void
+MessageRecordset::dump()
+{
+    Recordset::dump();
+    if (rs->EndOfFile && rs->BOF)    // No records
+        return;
+    rs->MoveFirst();
+
+    Binding b;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&b);
+   
+    while (VARIANT_FALSE == rs->EndOfFile) {
+        QPID_LOG(notice, "Msg " << b.messageId);
+        rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,85 @@
+#ifndef QPID_STORE_MSSQL_MESSAGERECORDSET_H
+#define QPID_STORE_MSSQL_MESSAGERECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "BlobRecordset.h"
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class MessageRecordset
+ *
+ * Class for storing and recovering messages. Messages are primarily blobs
+ * and handled similarly. However, messages larger than the staging threshold
+ * are not contained completely in memory; they're left mostly in the store
+ * and the header is held in memory. So when the message "blob" is saved,
+ * an additional size-of-the-header field is prepended to the blob.
+ * On recovery, the size-of-the-header is used to get only what's needed
+ * until it's determined if the entire message is to be recovered to memory.
+ */
+class MessageRecordset : public BlobRecordset {
+    class Binding : public CADORecordBinding {
+        BEGIN_ADO_BINDING(Binding)
+          ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
+        END_ADO_BINDING()
+
+    public:
+        uint64_t messageId;
+    };
+
+public:
+    // Store a message. Store the header size (4 bytes) then the regular
+    // blob comprising the message.
+    void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+    // Append additional content to an existing message.
+    void append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                const std::string& data);
+
+    // Remove an existing message
+    void remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg);
+
+    // Load all or part of a stored message. This skips the header parts and
+    // loads content.
+    void loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                     std::string& data,
+                     uint64_t offset,
+                     uint32_t length);
+
+    // Recover messages and save a map of those recovered.
+    void recover(qpid::broker::RecoveryManager& recoverer,
+                 std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap);
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_MESSAGERECORDSET_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "Recordset.h"
+#include "BlobEncoder.h"
+#include "DatabaseConnection.h"
+#include "VariantHelper.h"
+
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+#if 0
+Recordset::Iterator::Iterator(Recordset& _rs) : rs(_rs)
+{
+    rs->MoveFirst();
+    setCurrent();
+}
+
+std::pair<uint64_t, BlobAdapter>&
+Recordset::Iterator::dereference() const
+{
+  return const_cast<std::pair<uint64_t, BlobAdapter> >(current);
+}
+
+void
+Recordset::Iterator::increment()
+{
+    rs->MoveNext();
+    setCurrent();
+}
+
+bool
+Recordset::Iterator::equal(const Iterator& x) const
+{
+    return current.first == x.current.first;
+}
+
+void
+Recordset::Iterator::setCurrent()
+{
+    if (!rs->EndOfFile) {
+        uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+        long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        current = std::make_pair(id, blob);
+    }
+    else {
+        current.first = 0;
+    }
+}
+#endif
+
+void
+Recordset::open(DatabaseConnection* conn, const std::string& table)
+{
+    _ConnectionPtr p = *conn;
+    TESTHR(rs.CreateInstance(__uuidof(::Recordset)));
+    rs->Open(table.c_str(),
+             _variant_t((IDispatch *)p, true), 
+             adOpenKeyset,
+             adLockOptimistic,
+             adCmdTable);
+    tableName = table;
+}
+
+void
+Recordset::close()
+{
+    if (rs && rs->State == adStateOpen)
+        rs->Close();
+    rs = 0;    
+}
+
+void
+Recordset::requery()
+{
+    // Restore the recordset to reflect all current records.
+    rs->Filter = "";
+    rs->Requery(-1);
+}
+
+void
+Recordset::dump()
+{
+    long count = rs->RecordCount;
+    QPID_LOG(notice, "DB Dump: " + tableName <<
+                     ": " << count << " records");
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,101 @@
+#ifndef QPID_STORE_MSSQL_RECORDSET_H
+#define QPID_STORE_MSSQL_RECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+        no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+#include <comutil.h>
+#include <string>
+#if 0
+#include <utility>
+#endif
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class Recordset
+ *
+ * Represents an ADO Recordset, abstracting out the common operations needed
+ * on the common tables used that have 2 fields, persistence ID and blob.
+ */
+class Recordset {
+protected:
+    _RecordsetPtr rs;
+    DatabaseConnection* dbConn;
+    std::string tableName;
+
+public:
+
+#if 0
+    /**
+     * Iterator support for walking through the recordset.
+     * If I need to try this again, I'd look at Recordset cloning.
+     */
+    class Iterator : public boost::iterator_facade<
+      Iterator, std::pair<uint64_t, BlobAdapter>, boost::random_access_traversal_tag>
+    {
+    public:
+        Iterator() : rs(0) { }
+        Iterator(Recordset& _rs);
+
+    private:
+        friend class boost::iterator_core_access;
+
+        std::pair<uint64_t, BlobAdapter>& dereference() const;
+        void increment();
+        bool equal(const Iterator& x) const;
+
+        _RecordsetPtr rs;
+        std::pair<uint64_t, BlobAdapter> current;
+
+        void setCurrent();
+    };
+
+    friend class Iterator;
+#endif
+
+    Recordset() : rs(0) {}
+    virtual ~Recordset() { close(); }
+    void open(DatabaseConnection* conn, const std::string& table);
+    void close();
+    void requery();
+    operator _RecordsetPtr () { return rs; }
+#if 0
+    Iterator begin() { Iterator iter(*this); return iter; }
+    Iterator end() { Iterator iter; return iter; }
+#endif
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_RECORDSET_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "State.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+State::State() : dbConn(0)
+{
+    HRESULT hr = ::CoInitializeEx(NULL, COINIT_MULTITHREADED);
+    if (hr != S_OK && hr != S_FALSE)
+        throw Exception("Error initializing COM");
+}
+
+State::~State()
+{
+    if (dbConn)
+        delete dbConn;
+    ::CoUninitialize();
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,52 @@
+#ifndef QPID_STORE_MSSQL_STATE_H
+#define QPID_STORE_MSSQL_STATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @struct State
+ *
+ * Represents a thread's state for accessing ADO and the database.
+ * Creating an instance of State initializes COM for this thread, and
+ * destroying it uninitializes COM. There's also a DatabaseConnection
+ * for this thread's default access to the database. More DatabaseConnections
+ * can always be created, but State has one that can always be used by
+ * the thread whose state is represented.
+ *
+ * This class is intended to be one-per-thread, so it should be accessed
+ * via thread-specific storage.
+ */
+struct State {
+    State();
+    ~State();
+    DatabaseConnection *dbConn;
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_STATE_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class Wrapped>
+VariantHelper<Wrapped>::VariantHelper()
+{
+    var.vt = VT_EMPTY;
+}
+
+template <class Wrapped>
+VariantHelper<Wrapped>::operator const _variant_t& () const
+{
+    return var;
+}
+
+// Specialization for using _variant_t to wrap a std::string
+VariantHelper<std::string>::VariantHelper(const std::string &init)
+{
+    var.SetString(init.c_str());
+}
+
+VariantHelper<std::string>&
+VariantHelper<std::string>::operator=(const std::string &rhs)
+{
+    var.SetString(rhs.c_str());
+    return *this;
+}
+
+VariantHelper<std::string>::operator const _variant_t& () const
+{
+    return var;
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_VARIANTHELPER_H
+#define QPID_STORE_MSSQL_VARIANTHELPER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class VariantHelper
+ *
+ * Class template to wrap the details of working with _variant_t objects.
+ */
+template <class Wrapped> class VariantHelper {
+private:
+    _variant_t var;
+
+public:
+    VariantHelper();
+    VariantHelper(const Wrapped &init);
+
+    VariantHelper& operator =(const Wrapped& rhs);
+    operator const _variant_t& () const;
+};
+
+// Specialization for using _variant_t to wrap a std::string
+template<> class VariantHelper<std::string> {
+private:
+    _variant_t var;
+
+public:
+    VariantHelper(const std::string &init);
+    VariantHelper& operator =(const std::string& rhs);
+    operator const _variant_t& () const;
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_VARIANTHELPER_H */



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org