You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/28 16:36:40 UTC

svn commit: r830613 [1/2] - in /qpid/branches/0.5.x-dev/qpid/cpp/src/qpid: broker/ store/ store/ms-sql/

Author: ritchiem
Date: Wed Oct 28 15:36:39 2009
New Revision: 830613

URL: http://svn.apache.org/viewvc?rev=830613&view=rev
Log:
Initial checkin of portable message store plugin and MS SQL-specific storage provider. Goes with QPID-2017

Added:
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/CMakeLists.txt
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StorageProvider.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StoreException.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Exception.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Recordset.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/State.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.cpp
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/VariantHelper.h
Modified:
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/MessageStore.h
    qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/PersistableMessage.h

Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/MessageStore.h?rev=830613&r1=830612&r2=830613&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/MessageStore.h Wed Oct 28 15:36:39 2009
@@ -46,15 +46,7 @@
   public:
 
     /**
-     * init the store, call before any other call. If not called, store
-     * is free to pick any defaults
-     *
-     * @param options Options object provided by concrete store plug in.
-     */
-    virtual bool init(const Options* options) = 0;
-
-    /**
-     * If called after init() but before recovery, will discard the database
+     * If called after initialization but before recovery, will discard the database
      * and reinitialize using an empty store dir. If the parameter pushDownStoreFiles
      * is true, the content of the store dir will be moved to a backup dir inside the
      * store dir. This is used when cluster nodes recover and must get thier content

Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=830613&r1=830612&r2=830613&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Oct 28 15:36:39 2009
@@ -104,9 +104,9 @@
 
     void flush();
     
-    bool QPID_BROKER_EXTERN isContentReleased() const;
+    QPID_BROKER_EXTERN bool isContentReleased() const;
 
-    void QPID_BROKER_EXTERN setStore(MessageStore*);
+    QPID_BROKER_EXTERN void setStore(MessageStore*);
     void requestContentRelease();
     void blockContentRelease();
     bool checkContentReleasable();

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/CMakeLists.txt (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/CMakeLists.txt Wed Oct 28 15:36:39 2009
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+project(qpidc_store)
+
+#set (CMAKE_VERBOSE_MAKEFILE ON)  # for debugging
+
+include_directories( ${Boost_INCLUDE_DIR} )
+
+include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
+include_directories( ${CMAKE_HOME_DIRECTORY}/include )
+
+link_directories( ${Boost_LIBRARY_DIRS} )
+
+set (store_SOURCES
+     MessageStorePlugin.cpp
+    )
+add_library (store MODULE ${store_SOURCES})
+target_link_libraries (store qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
+if (CMAKE_COMPILER_IS_GNUCXX)
+  set_target_properties (store PROPERTIES
+                         PREFIX ""
+                         LINK_FLAGS -Wl,--no-undefined)
+endif (CMAKE_COMPILER_IS_GNUCXX)
+
+if (CMAKE_SYSTEM_NAME STREQUAL Windows)
+  if (MSVC)
+    add_definitions( 
+      /D "NOMINMAX"
+      /D "WIN32_LEAN_AND_MEAN"
+    )
+  endif (MSVC)
+endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+
+# Build the MS SQL Storage Provider plugin
+set (mssql_default ON)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+  set(mssql_default OFF)
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+option(BUILD_MSSQL "Build MS SQL Store provider plugin" ${mssql_default})
+if (BUILD_MSSQL)
+  add_library (mssql_store MODULE
+               ms-sql/MsSqlProvider.cpp
+               ms-sql/AmqpTransaction.cpp
+               ms-sql/BindingRecordset.cpp
+               ms-sql/BlobAdapter.cpp
+               ms-sql/BlobEncoder.cpp
+               ms-sql/BlobRecordset.cpp
+               ms-sql/DatabaseConnection.cpp
+               ms-sql/MessageMapRecordset.cpp
+               ms-sql/MessageRecordset.cpp
+               ms-sql/Recordset.cpp
+               ms-sql/State.cpp
+               ms-sql/VariantHelper.cpp)
+  target_link_libraries (mssql_store qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
+endif (BUILD_MSSQL)

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,444 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageStorePlugin.h"
+#include "StorageProvider.h"
+#include "StoreException.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/DataDir.h"
+#include "qpid/log/Statement.h"
+
+/*
+ * The MessageStore pointer given to the Broker points to static storage.
+ * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
+ * this no-op deleter is used with the boost::shared_ptr. When the last
+ * shared_ptr is destroyed, the deleter is called rather than delete().
+ */
+namespace {
+  class NoopDeleter {
+  public:
+      NoopDeleter() {}
+      void operator()(qpid::broker::MessageStore *p) {}
+  };
+}
+
+namespace qpid {
+namespace store {
+
+static MessageStorePlugin static_instance_registers_plugin;
+
+
+MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
+    qpid::Options(name)
+{
+    addOptions()
+        ("storage-provider", qpid::optValue(providerName, "PROVIDER"),
+         "Name of the storage provider to use.")
+        ;
+}
+
+
+void
+MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
+{
+    qpid::broker::Broker* broker =
+        dynamic_cast<qpid::broker::Broker*>(&target);
+    if (0 == broker)
+        return;        // Only listen to Broker targets
+
+    // See if there are any storage provider plugins ready. If not, we can't
+    // do a message store.
+    qpid::Plugin::earlyInitAll(*this);
+
+    if (providers.empty()) {
+        QPID_LOG(warning,
+                 "Message store plugin: No storage providers available.");
+        return;
+    }
+    if (!options.providerName.empty()) {
+        // If specific one was chosen, locate it in loaded set of providers.
+        provider = providers.find(options.providerName);
+        if (provider == providers.end())
+            throw Exception("Message store plugin: storage provider '" +
+                            options.providerName +
+                            "' does not exist.");
+    }
+    else {
+        // No specific provider chosen; if there's only one, use it. Else
+        // report the need to pick one.
+        if (providers.size() > 1)
+            throw Exception("Message store plugin: multiple provider plugins "
+                            "loaded; must either load only one or select one "
+                            "using --storage-provider");
+        provider = providers.begin();
+    }
+
+    provider->second->activate(*this);
+    NoopDeleter d;
+    boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
+    broker->setStore(sp);
+    target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
+}
+
+void
+MessageStorePlugin::initialize(qpid::Plugin::Target& target)
+{
+    qpid::broker::Broker* broker =
+        dynamic_cast<qpid::broker::Broker*>(&target);
+    if (0 == broker)
+        return;        // Only listen to Broker targets
+
+    // Pass along the initialize step to the provider that's activated.
+    if (provider != providers.end()) {
+        provider->second->initialize(*this);
+    }
+    //    qpid::Plugin::initializeAll(*this);
+}
+
+void
+MessageStorePlugin::finalizeMe()
+{
+    finalize();              // Call finalizers on any Provider plugins
+}
+
+void
+MessageStorePlugin::providerAvailable(const std::string name,
+                                      StorageProvider *be)
+{
+    ProviderMap::value_type newSp(name, be);
+    std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
+    if (inserted.second == false)
+        QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
+}
+
+void
+MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
+{
+    QPID_LOG(info, "Store: truncateInit");
+}
+
+
+/**
+ * Record the existence of a durable queue
+ */
+void
+MessageStorePlugin::create(broker::PersistableQueue& queue,
+                           const framing::FieldTable& args)
+{
+    if (queue.getName().size() == 0)
+    {
+        QPID_LOG(error,
+                 "Cannot create store for empty (null) queue name - "
+                 "ignoring and attempting to continue.");
+        return;
+    }
+    if (queue.getPersistenceId()) {
+        THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
+    }
+    provider->second->create(queue, args);
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableQueue& queue)
+{
+    provider->second->destroy(queue);
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MessageStorePlugin::create(const broker::PersistableExchange& exchange,
+                           const framing::FieldTable& args)
+{
+    if (exchange.getPersistenceId()) {
+        THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
+    }
+    provider->second->create(exchange, args);
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
+{
+    provider->second->destroy(exchange);
+}
+
+/**
+ * Record a binding
+ */
+void
+MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
+                         const broker::PersistableQueue& queue,
+                         const std::string& key,
+                         const framing::FieldTable& args)
+{
+    provider->second->bind(exchange, queue, key, args);
+}
+
+/**
+ * Forget a binding
+ */
+void
+MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
+                           const broker::PersistableQueue& queue,
+                           const std::string& key,
+                           const framing::FieldTable& args)
+{
+    provider->second->unbind(exchange, queue, key, args);
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MessageStorePlugin::create(const broker::PersistableConfig& config)
+{
+    if (config.getPersistenceId()) {
+        THROW_STORE_EXCEPTION("Config item already created: " +
+                              config.getName());
+    }
+    provider->second->create(config);
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MessageStorePlugin::destroy(const broker::PersistableConfig& config)
+{
+    provider->second->destroy(config);
+}
+
+/**
+ * Stores a message before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point).
+ */
+void
+MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
+{
+    if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+        provider->second->stage(msg);
+    }
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MessageStorePlugin::destroy(broker::PersistableMessage& msg)
+{
+    if (msg.getPersistenceId())
+        provider->second->destroy(msg);
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MessageStorePlugin::appendContent
+  (const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+   const std::string& data)
+{
+    if (msg->getPersistenceId())
+        provider->second->appendContent(msg, data);
+    else
+        THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
+                                const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+                                std::string& data,
+                                uint64_t offset,
+                                uint32_t length)
+{
+    if (msg->getPersistenceId())
+        provider->second->loadContent(queue, msg, data, offset, length);
+    else
+        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
+                            const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+                            const broker::PersistableQueue& queue)
+{
+    if (queue.getPersistenceId() == 0) {
+        THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+    }
+    provider->second->enqueue(ctxt, msg, queue);
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
+                            const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+                            const broker::PersistableQueue& queue)
+{
+    provider->second->dequeue(ctxt, msg, queue);
+}
+
+/**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: The operation is asynchronous so the return of this function does
+ * not mean the operation is complete.
+ */
+void
+MessageStorePlugin::flush(const broker::PersistableQueue& queue)
+{
+    provider->second->flush(queue);
+}
+
+/**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk.
+ */
+uint32_t
+MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
+{
+    return provider->second->outstandingQueueAIO(queue);
+}
+
+std::auto_ptr<broker::TransactionContext>
+MessageStorePlugin::begin()
+{
+    return provider->second->begin();
+}
+
+std::auto_ptr<broker::TPCTransactionContext>
+MessageStorePlugin::begin(const std::string& xid)
+{
+    return provider->second->begin(xid);
+}
+
+void
+MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
+{
+    provider->second->prepare(ctxt);
+}
+
+void
+MessageStorePlugin::commit(broker::TransactionContext& ctxt)
+{
+    provider->second->commit(ctxt);
+}
+
+void
+MessageStorePlugin::abort(broker::TransactionContext& ctxt)
+{
+    provider->second->abort(ctxt);
+}
+
+void
+MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
+{
+    provider->second->collectPreparedXids(xids);
+}
+
+/**
+ * Request recovery of queue and message state; inherited from Recoverable
+ */
+void
+MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
+{
+    ExchangeMap exchanges;
+    QueueMap queues;
+    MessageMap messages;
+    MessageQueueMap messageQueueMap;
+
+    provider->second->recoverConfigs(recoverer);
+    provider->second->recoverExchanges(recoverer, exchanges);
+    provider->second->recoverQueues(recoverer, queues);
+    provider->second->recoverBindings(recoverer, exchanges);
+    provider->second->recoverMessages(recoverer, messages, messageQueueMap);
+    // Enqueue msgs where needed.
+    for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
+         i != messageQueueMap.end();
+         ++i) {
+        // Locate the message corresponding to the current message Id
+        MessageMap::const_iterator iMsg = messages.find(i->first);
+        if (iMsg == messages.end()) {
+            std::ostringstream oss;
+            oss << "No matching message trying to re-enqueue message "
+                << i->first;
+            THROW_STORE_EXCEPTION(oss.str());
+        }
+        broker::RecoverableMessage::shared_ptr msg = iMsg->second;
+        // Now for each queue referenced in the queue map, locate it
+        // and re-enqueue the message.
+        for (std::vector<uint64_t>::const_iterator j = i->second.begin();
+             j != i->second.end();
+             ++j) {
+            // Locate the queue corresponding to the current queue Id
+            QueueMap::const_iterator iQ = queues.find(*j);
+            if (iQ == queues.end()) {
+                std::ostringstream oss;
+                oss << "No matching queue trying to re-enqueue message "
+                    << " on queue Id " << *j;
+                THROW_STORE_EXCEPTION(oss.str());
+            }
+            iQ->second->recover(msg);
+        }
+    }
+
+    // recoverTransactions() and apply correctly while re-enqueuing
+}
+
+}} // namespace qpid::store

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/MessageStorePlugin.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,284 @@
+#ifndef QPID_STORE_MESSAGESTOREPLUGIN_H
+#define QPID_STORE_MESSAGESTOREPLUGIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/PersistableExchange.h"
+#include "qpid/broker/PersistableMessage.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/management/Manageable.h"
+
+#include <string>
+
+using namespace qpid;
+
+namespace qpid {
+namespace store {
+
+class StorageProvider;
+
+/**
+ * @class MessageStorePlugin
+ *
+ * MessageStorePlugin is the front end of the persistent message store
+ * plugin. It is responsible for coordinating recovery, initialization,
+ * transactions (both local and distributed), flow-to-disk loading and
+ * unloading and persisting broker state (queues, bindings etc.).
+ * Actual storage operations are carried out by a message store storage
+ * provider that implements the qpid::store::StorageProvider interface.
+ */
+class MessageStorePlugin :
+    public qpid::Plugin,
+    public qpid::broker::MessageStore,        // Frontend classes
+    public qpid::Plugin::Target               // Provider target
+    // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info?    public qpid::management::Manageable
+{
+  public:
+    MessageStorePlugin() {}
+
+    /**
+     * @name Methods inherited from qpid::Plugin
+     */
+    //@{
+    virtual Options* getOptions() { return &options; }
+    virtual void earlyInitialize (Plugin::Target& target);
+    virtual void initialize(Plugin::Target& target);
+    //@}
+
+    /// Finalizer; calls Target::finalize() to run finalizers on
+    /// StorageProviders.
+    void finalizeMe();
+
+    /**
+     * Called by StorageProvider instances during the earlyInitialize sequence.
+     * Each StorageProvider must supply a unique name by which it is known and a
+     * pointer to itself.
+     */
+    virtual void providerAvailable(const std::string name, StorageProvider *be);
+
+    /**
+     * @name Methods inherited from qpid::broker::MessageStore
+     */
+    //@{
+    /**
+     * If called before recovery, will discard the database and reinitialize
+     * using an empty store. This is used when cluster nodes recover and
+     * must get their content from a cluster sync rather than directly from
+     * the store.
+     *
+     * @param saveStoreContent    If true, the store's contents should be
+     *                            saved to a backup location before
+     *                            reinitializing the store content.
+     */
+    virtual void truncateInit(const bool saveStoreContent = false);
+
+    /**
+     * Record the existence of a durable queue
+     */
+    virtual void create(broker::PersistableQueue& queue,
+                        const framing::FieldTable& args);
+    /**
+     * Destroy a durable queue
+     */
+    virtual void destroy(broker::PersistableQueue& queue);
+
+    /**
+     * Record the existence of a durable exchange
+     */
+    virtual void create(const broker::PersistableExchange& exchange,
+                        const framing::FieldTable& args);
+    /**
+     * Destroy a durable exchange
+     */
+    virtual void destroy(const broker::PersistableExchange& exchange);
+
+    /**
+     * Record a binding
+     */
+    virtual void bind(const broker::PersistableExchange& exchange,
+                      const broker::PersistableQueue& queue,
+                      const std::string& key,
+                      const framing::FieldTable& args);
+
+    /**
+     * Forget a binding
+     */
+    virtual void unbind(const broker::PersistableExchange& exchange,
+                        const broker::PersistableQueue& queue,
+                        const std::string& key,
+                        const framing::FieldTable& args);
+
+    /**
+     * Record generic durable configuration
+     */
+    virtual void create(const broker::PersistableConfig& config);
+
+    /**
+     * Destroy generic durable configuration
+     */
+    virtual void destroy(const broker::PersistableConfig& config);
+
+    /**
+     * Stores a message before it has been enqueued
+     * (enqueueing automatically stores the message so this is
+     * only required if storage is required prior to that
+     * point). If the message has not yet been stored it will
+     * store the headers as well as any content passed in. A
+     * persistence id will be set on the message which can be
+     * used to load the content or to append to it.
+     */
+    virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg);
+
+    /**
+     * Destroys a previously staged message. This only needs
+     * to be called if the message is never enqueued. (Once
+     * enqueued, deletion will be automatic when the message
+     * is dequeued from all queues it was enqueued onto).
+     */
+    virtual void destroy(broker::PersistableMessage& msg);
+
+    /**
+     * Appends content to a previously staged message
+     */
+    virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+                               const std::string& data);
+
+    /**
+     * Loads (a section) of content data for the specified
+     * message (previously stored through a call to stage or
+     * enqueue) into data. The offset refers to the content
+     * only (i.e. an offset of 0 implies that the start of the
+     * content should be loaded, not the headers or related
+     * meta-data).
+     */
+    virtual void loadContent(const broker::PersistableQueue& queue,
+                             const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
+                             std::string& data,
+                             uint64_t offset,
+                             uint32_t length);
+
+    /**
+     * Enqueues a message, storing the message if it has not
+     * been previously stored and recording that the given
+     * message is on the given queue.
+     *
+     * Note: The operation is asynchronous so the return of this function does
+     * not mean the operation is complete.
+     *
+     * @param msg the message to enqueue
+     * @param queue the name of the queue onto which it is to be enqueued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void enqueue(broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+                         const broker::PersistableQueue& queue);
+
+    /**
+     * Dequeues a message, recording that the given message is
+     * no longer on the given queue and deleting the message
+     * if it is no longer on any other queue.
+     *
+     *
+     * Note: The operation is asynchronous so the return of this function does
+     * not mean the operation is complete.
+     *
+     * @param msg the message to dequeue
+     * @param queue the name of the queue from which it is to be dequeued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void dequeue(broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<broker::PersistableMessage>& msg,
+                         const broker::PersistableQueue& queue);
+
+    /**
+     * Flushes all async messages to disk for the specified queue
+     *
+     *
+     * Note: The operation is asynchronous so the return of this function does
+     * not mean the operation is complete.
+     *
+     * @param queue the name of the queue from which it is to be dequeued
+     */
+    virtual void flush(const broker::PersistableQueue& queue);
+
+    /**
+     * Returns the number of outstanding AIO's for a given queue
+     *
+     * If 0, than all the enqueue / dequeues have been stored
+     * to disk
+     *
+     * @param queue the name of the queue to check for outstanding AIO
+     */
+    virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue);
+    //@}
+
+    /**
+     * @name Methods inherited from qpid::broker::TransactionalStore
+     */
+    //@{
+    std::auto_ptr<broker::TransactionContext> begin();
+
+    std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid);
+
+    void prepare(broker::TPCTransactionContext& ctxt);
+
+    void commit(broker::TransactionContext& ctxt);
+
+    void abort(broker::TransactionContext& ctxt);
+
+    void collectPreparedXids(std::set<std::string>& xids);
+    //@}
+
+    /**
+     * Request recovery of queue and message state; inherited from Recoverable
+     */
+    virtual void recover(broker::RecoveryManager& recoverer);
+
+    //    inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&)
+    //        { return management::Manageable::STATUS_OK; }
+
+  protected:
+
+    struct StoreOptions : public qpid::Options {
+        StoreOptions(const std::string& name="Store Options");
+        std::string providerName;
+    };
+    StoreOptions options;
+
+    typedef std::map<const std::string, StorageProvider*> ProviderMap;
+    ProviderMap providers;
+    ProviderMap::const_iterator provider;
+
+}; // class MessageStoreImpl
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif /* QPID_SERIALIZER_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StorageProvider.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StorageProvider.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StorageProvider.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,322 @@
+#ifndef QPID_STORE_STORAGEPROVIDER_H
+#define QPID_STORE_STORAGEPROVIDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include <stdexcept>
+#include <vector>
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/MessageStore.h"
+
+using qpid::broker::PersistableConfig;
+using qpid::broker::PersistableExchange;
+using qpid::broker::PersistableMessage;
+using qpid::broker::PersistableQueue;
+
+namespace qpid {
+namespace store {
+
+typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr>
+    ExchangeMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr>
+    QueueMap;
+typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr>
+    MessageMap;
+// Msg Id -> vector of queue Ids where message is queued
+typedef std::map<uint64_t, std::vector<uint64_t> > MessageQueueMap;
+
+class MessageStorePlugin;
+
+/**
+ * @class StorageProvider
+ *
+ * StorageProvider defines the interface for the storage provider plugin to the
+ * Qpid broker persistence store plugin.
+ *
+ * @TODO Should StorageProvider also inherit from MessageStore? If so, then
+ *       maybe remove Recoverable from MessageStore's inheritance and move it
+ *       to MessageStorePlugin? In any event, somehow the discardInit() feature
+ *       needs to get added here.
+ */
+class StorageProvider : public qpid::Plugin, public qpid::broker::MessageStore
+{
+public:
+
+    class Exception : public std::exception
+    {
+    public:
+        virtual ~Exception() throw() {}
+        virtual const char *what() const throw() = 0;
+    };
+
+    /**
+     * @name Methods inherited from qpid::Plugin
+     */
+    //@{
+    /**
+     * Return a pointer to the provider's options. The options will be
+     * updated during option parsing by the host program; therefore, the
+     * referenced Options object must remain valid past this function's return.
+     * 
+     * @return An options group or 0 for no options. Default returns 0.
+     * Plugin retains ownership of return value.
+     */
+    virtual qpid::Options* getOptions() = 0;
+
+    /**
+     * Initialize Plugin functionality on a Target, called before
+     * initializing the target.
+     *
+     * StorageProviders should respond only to Targets of class
+     * qpid::store::MessageStorePlugin and ignore all others.
+     *
+     * When called, the provider should invoke the method
+     * qpid::store::MessageStorePlugin::providerAvailable() to alert the
+     * message store of StorageProvider's availability.
+     *
+     * Called before the target itself is initialized.
+     */
+    virtual void earlyInitialize (Plugin::Target& target) = 0;
+
+    /**
+     * Initialize StorageProvider functionality. Called after initializing
+     * the target.
+     * 
+     * StorageProviders should respond only to Targets of class
+     * qpid::store::MessageStorePlugin and ignore all others.
+     *
+     * Called after the target is fully initialized.
+     */
+    virtual void initialize(Plugin::Target& target) = 0;
+    //@}
+
+    /**
+     * Receive notification that this provider is the one that will actively
+     * handle storage for the target. If the provider is to be used, this
+     * method will be called after earlyInitialize() and before any
+     * recovery operations (recovery, in turn, precedes call to initialize()).
+     * Thus, it is wise to not actually do any database ops from within
+     * earlyInitialize() - they can wait until activate() is called because
+     * at that point it is certain the database will be needed.
+     */
+    virtual void activate(MessageStorePlugin &store) = 0;
+
+    /**
+     * @name Methods inherited from qpid::broker::MessageStore
+     */
+    //@{
+    /**
+     * If called after init() but before recovery, will discard the database
+     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+     * is true, the content of the store dir will be moved to a backup dir
+     * inside the store dir. This is used when cluster nodes recover and must
+     * get thier content from a cluster sync rather than directly fromt the
+     * store.
+     *
+     * @param pushDownStoreFiles If true, will move content of the store dir
+     *                           into a subdir, leaving the store dir
+     *                           otherwise empty.
+     */
+    virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
+
+    /**
+     * Record the existence of a durable queue
+     */
+    virtual void create(PersistableQueue& queue,
+                        const qpid::framing::FieldTable& args) = 0;
+    /**
+     * Destroy a durable queue
+     */
+    virtual void destroy(PersistableQueue& queue) = 0;
+
+    /**
+     * Record the existence of a durable exchange
+     */
+    virtual void create(const PersistableExchange& exchange,
+                        const qpid::framing::FieldTable& args) = 0;
+    /**
+     * Destroy a durable exchange
+     */
+    virtual void destroy(const PersistableExchange& exchange) = 0;
+
+    /**
+     * Record a binding
+     */
+    virtual void bind(const PersistableExchange& exchange,
+                      const PersistableQueue& queue,
+                      const std::string& key,
+                      const qpid::framing::FieldTable& args) = 0;
+
+    /**
+     * Forget a binding
+     */
+    virtual void unbind(const PersistableExchange& exchange,
+                        const PersistableQueue& queue,
+                        const std::string& key,
+                        const qpid::framing::FieldTable& args) = 0;
+
+    /**
+     * Record generic durable configuration
+     */
+    virtual void create(const PersistableConfig& config) = 0;
+
+    /**
+     * Destroy generic durable configuration
+     */
+    virtual void destroy(const PersistableConfig& config) = 0;
+
+    /**
+     * Stores a messages before it has been enqueued
+     * (enqueueing automatically stores the message so this is
+     * only required if storage is required prior to that
+     * point). If the message has not yet been stored it will
+     * store the headers as well as any content passed in. A
+     * persistence id will be set on the message which can be
+     * used to load the content or to append to it.
+     */
+    virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0;
+
+    /**
+     * Destroys a previously staged message. This only needs
+     * to be called if the message is never enqueued. (Once
+     * enqueued, deletion will be automatic when the message
+     * is dequeued from all queues it was enqueued onto).
+     */
+    virtual void destroy(PersistableMessage& msg) = 0;
+
+    /**
+     * Appends content to a previously staged message
+     */
+    virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+                               const std::string& data) = 0;
+
+    /**
+     * Loads (a section) of content data for the specified
+     * message (previously stored through a call to stage or
+     * enqueue) into data. The offset refers to the content
+     * only (i.e. an offset of 0 implies that the start of the
+     * content should be loaded, not the headers or related
+     * meta-data).
+     */
+    virtual void loadContent(const PersistableQueue& queue,
+                             const boost::intrusive_ptr<const PersistableMessage>& msg,
+                             std::string& data,
+                             uint64_t offset,
+                             uint32_t length) = 0;
+
+    /**
+     * Enqueues a message, storing the message if it has not
+     * been previously stored and recording that the given
+     * message is on the given queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to enqueue
+     * @param queue the name of the queue onto which it is to be enqueued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue) = 0;
+
+    /**
+     * Dequeues a message, recording that the given message is
+     * no longer on the given queue and deleting the message
+     * if it is no longer on any other queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to dequeue
+     * @param queue the name of the queue from which it is to be dequeued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue) = 0;
+
+    /**
+     * Flushes all async messages to disk for the specified queue
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param queue the name of the queue from which it is to be dequeued
+     */
+    virtual void flush(const qpid::broker::PersistableQueue& queue) = 0;
+
+    /**
+     * Returns the number of outstanding AIO's for a given queue
+     *
+     * If 0, than all the enqueue / dequeues have been stored
+     * to disk
+     *
+     * @param queue the name of the queue to check for outstanding AIO
+     */
+    virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+    //@}
+
+    /**
+     * @TODO This should probably not be here - it's only here because
+     * MessageStore inherits from Recoverable... maybe move that derivation.
+     *
+     * As it is now, we don't use this. Separate recover methods are
+     * declared below for individual types, which also set up maps of
+     * messages, queues, transactions for the main store plugin to handle
+     * properly.
+     *
+     * Request recovery of queue and message state.
+     */
+    virtual void recover(qpid::broker::RecoveryManager& recoverer) {}
+
+    /**
+     * @name Methods that do the recovery of the various objects that
+     * were saved.
+     */
+    //@{
+
+    /**
+     * Recover bindings.
+     */
+    virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer) = 0;
+    virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+                                  ExchangeMap& exchangeMap) = 0;
+    virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+                               QueueMap& queueMap) = 0;
+    virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+                                 const ExchangeMap& exchangeMap) = 0;
+    virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+                                 MessageMap& messageMap,
+                                 MessageQueueMap& messageQueueMap) = 0;
+    //@}
+};
+
+}} // namespace qpid::store
+
+#endif /* QPID_STORE_STORAGEPROVIDER_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StoreException.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StoreException.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StoreException.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/StoreException.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,49 @@
+#ifndef QPID_STORE_STOREEXCEPTION_H
+#define QPID_STORE_STOREEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <exception>
+#include <boost/format.hpp>
+#include "StorageProvider.h"
+
+namespace qpid {
+namespace store {
+
+class StoreException : public std::exception
+{
+    std::string text;
+public:
+    StoreException(const std::string& _text) : text(_text) {}
+    StoreException(const std::string& _text,
+                   const StorageProvider::Exception& cause)
+        : text(_text + ": " + cause.what()) {}
+    virtual ~StoreException() throw() {}
+    virtual const char* what() const throw() { return text.c_str(); }
+};
+
+#define THROW_STORE_EXCEPTION(MESSAGE) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__))
+#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw qpid::store::StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION)
+
+}}  // namespace qpid::store
+
+#endif /* QPID_STORE_STOREEXCEPTION_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AmqpTransaction.h"
+#include "DatabaseConnection.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+AmqpTransaction::AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db)
+  : db(_db), transDepth(0)
+{
+}
+
+AmqpTransaction::~AmqpTransaction()
+{
+    if (transDepth > 0)
+        this->abort();
+}
+
+void
+AmqpTransaction::begin()
+{
+    _bstr_t beginCmd("BEGIN TRANSACTION");
+    _ConnectionPtr c = *db;
+    c->Execute(beginCmd, NULL, adExecuteNoRecords);
+    ++transDepth;
+}
+
+void
+AmqpTransaction::commit()
+{
+    if (transDepth > 0) {
+        _bstr_t commitCmd("COMMIT TRANSACTION");
+        _ConnectionPtr c = *db;
+        c->Execute(commitCmd, NULL, adExecuteNoRecords);
+        --transDepth;
+    }
+}
+
+void
+AmqpTransaction::abort()
+{
+    if (transDepth > 0) {
+        _bstr_t rollbackCmd("ROLLBACK TRANSACTION");
+        _ConnectionPtr c = *db;
+        c->Execute(rollbackCmd, NULL, adExecuteNoRecords);
+        transDepth = 0;
+    }
+}
+
+AmqpTPCTransaction::AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+                                       const std::string& _xid)
+  : AmqpTransaction(_db), xid(_xid)
+{
+}
+
+AmqpTPCTransaction::~AmqpTPCTransaction()
+{
+}
+
+void
+AmqpTPCTransaction::prepare()
+{
+    // Intermediate transactions should have already assured integrity of
+    // the content in the database; just waiting to pull the trigger on the
+    // outermost transaction.
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/AmqpTransaction.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,84 @@
+#ifndef QPID_STORE_MSSQL_AMQPTRANSACTION_H
+#define QPID_STORE_MSSQL_AMQPTRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <string>
+#include <memory>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class DatabaseConnection;
+
+/**
+ * @class AmqpTransaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ */
+class AmqpTransaction : public qpid::broker::TransactionContext {
+
+    std::auto_ptr<DatabaseConnection> db;
+
+    // Since ADO w/ SQLOLEDB can't do nested transaction via its BeginTrans(),
+    // et al, nested transactions are carried out with direct SQL commands.
+    // To ensure the state of this is known, keep track of how deeply the
+    // transactions are nested.
+    unsigned int transDepth;
+
+public:
+    AmqpTransaction(std::auto_ptr<DatabaseConnection>& _db);
+    virtual ~AmqpTransaction();
+
+    DatabaseConnection *dbConn() { return db.get(); }
+
+    void begin();
+    void commit();
+    void abort();
+};
+
+/**
+ * @class AmqpTPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ */
+class AmqpTPCTransaction : public AmqpTransaction,
+                           public qpid::broker::TPCTransactionContext {
+    std::string  xid;
+
+public:
+    AmqpTPCTransaction(std::auto_ptr<DatabaseConnection>& _db,
+                       const std::string& _xid);
+    virtual ~AmqpTPCTransaction();
+
+    void prepare();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_AMQPTRANSACTION_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BindingRecordset.h"
+#include "BlobAdapter.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BindingRecordset::add(uint64_t exchangeId,
+                      const std::string& queueName,
+                      const std::string& routingKey,
+                      const qpid::framing::FieldTable& args)
+{
+    VariantHelper<std::string> queueNameStr(queueName);
+    VariantHelper<std::string> routingKeyStr(routingKey);
+    BlobEncoder blob (args);   // Marshall field table to a blob
+    rs->AddNew();
+    rs->Fields->GetItem("exchangeId")->Value = exchangeId;
+    rs->Fields->GetItem("queueName")->Value = queueNameStr;
+    rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
+    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+    rs->Update();
+}
+
+void
+BindingRecordset::remove(uint64_t exchangeId,
+                         const std::string& queueName,
+                         const std::string& routingKey,
+                         const qpid::framing::FieldTable& /*args*/)
+{
+    // Look up the affected binding.
+    std::ostringstream filter;
+    filter << "exchangeId = " << exchangeId
+           << " AND queueName = '" << queueName << "'"
+           << " AND routingKey = '" << routingKey << "'" << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount != 0) {
+        // Delete the records
+        rs->Delete(adAffectGroup);
+        rs->Update();
+    }
+    requery();
+}
+
+void
+BindingRecordset::remove(uint64_t exchangeId)
+{
+    // Look up the affected bindings by the exchange ID
+    std::ostringstream filter;
+    filter << "exchangeId = " << exchangeId << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount != 0) {
+        // Delete the records
+        rs->Delete(adAffectGroup);
+        rs->Update();
+    }
+    requery();
+}
+
+void
+BindingRecordset::remove(const std::string& queueName)
+{
+    // Look up the affected bindings by the exchange ID
+    std::ostringstream filter;
+    filter << "queueName = '" << queueName << "'" << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount != 0) {
+        // Delete the records
+        rs->Delete(adAffectGroup);
+        rs->Update();
+    }
+    requery();
+}
+
+void
+BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
+                          std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap)
+{
+    if (rs->BOF && rs->EndOfFile)
+        return;   // Nothing to do
+    rs->MoveFirst();
+    Binding b;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&b);
+    while (!rs->EndOfFile) {
+        long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId];
+        std::string q(b.queueName), k(b.routingKey);
+        exch->bind(q, k, blob);
+        rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+void
+BindingRecordset::dump()
+{
+    Recordset::dump();
+    if (rs->EndOfFile && rs->BOF)    // No records
+        return;
+    rs->MoveFirst();
+
+    Binding b;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&b);
+   
+    while (VARIANT_FALSE == rs->EndOfFile) {
+      QPID_LOG(notice, "exch " << b.exchangeId
+                       << ", q: " << b.queueName
+                       << ", k: " << b.routingKey);
+      rs->MoveNext();
+    }
+
+    piAdoRecordBinding->Release();
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,84 @@
+#ifndef QPID_STORE_MSSQL_BINDINGRECORDSET_H
+#define QPID_STORE_MSSQL_BINDINGRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <icrsint.h>
+#include "Recordset.h"
+#include <qpid/broker/RecoveryManager.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BindingRecordset
+ *
+ * Class for the binding records.
+ */
+class BindingRecordset : public Recordset {
+
+    class Binding : public CADORecordBinding {
+        BEGIN_ADO_BINDING(Binding)
+          ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
+          ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName, 
+                                     sizeof(queueName), FALSE)
+          ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey, 
+                                     sizeof(routingKey), FALSE)
+        END_ADO_BINDING()
+
+    public:
+        uint64_t exchangeId;
+        char queueName[256];
+        char routingKey[256];
+    };
+
+public:
+    // Add a new binding
+    void add(uint64_t exchangeId,
+             const std::string& queueName,
+             const std::string& routingKey,
+             const qpid::framing::FieldTable& args);
+
+    // Remove a specific binding
+    void remove(uint64_t exchangeId,
+                const std::string& queueName,
+                const std::string& routingKey,
+                const qpid::framing::FieldTable& args);
+
+    // Remove all bindings for the specified exchange
+    void remove(uint64_t exchangeId);
+
+    // Remove all bindings for the specified queue
+    void remove(const std::string& queueName);
+
+    // Recover bindings set using exchMap to get from Id to RecoverableExchange.
+    void recover(qpid::broker::RecoveryManager& recoverer,
+                 std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap);
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BINDINGRECORDSET_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobAdapter.h"
+#include <qpid/Exception.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobAdapter::extractBuff()
+{
+    // To give a valid Buffer back, lock the safearray, obtaining a pointer to
+    // the actual data. Record the pointer in the Buffer so the destructor
+    // knows to unlock the safearray.
+    if (buff.getPointer() == 0) {
+        char *blob;
+        SafeArrayAccessData(this->parray, (void **)&blob);
+        qpid::framing::Buffer lockedBuff(blob, buff.getSize());
+        buff = lockedBuff;
+    }
+}
+
+
+BlobAdapter::~BlobAdapter()
+{
+    // If buff's pointer is set, the safearray is locked, so unlock it
+    if (buff.getPointer() != 0)
+        SafeArrayUnaccessData(this->parray);
+}
+
+BlobAdapter::operator qpid::framing::Buffer& ()
+{
+    extractBuff();
+    return buff;
+}
+
+BlobAdapter::operator qpid::framing::FieldTable& ()
+{
+    extractBuff();
+    fields.decode(buff);
+    return fields;
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobAdapter.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,62 @@
+#ifndef QPID_STORE_MSSQL_BLOBADAPTER_H
+#define QPID_STORE_MSSQL_BLOBADAPTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobAdapter
+ *
+ * Adapter for accessing a blob (varbinary SQL field) as a qpid::framing::Buffer
+ * in an exception-safe way.
+ */
+class BlobAdapter : public _variant_t {
+private:
+    // This Buffer's pointer indicates whether or not a safearray has
+    // been locked; if it's 0, no locking was done.
+    qpid::framing::Buffer buff;
+    qpid::framing::FieldTable fields;
+
+    void extractBuff();
+
+public:
+    // Initialize with the known length of the data that will come.
+    // Assigning a _variant_t to this object will set up the array to be
+    // accessed with the operator Buffer&()
+    BlobAdapter(long blobSize) : _variant_t(), buff(0, blobSize) {}
+    ~BlobAdapter();
+    BlobAdapter& operator=(_variant_t& var_t_Src)
+      { _variant_t::operator=(var_t_Src); return *this; }
+    operator qpid::framing::Buffer& ();
+    operator qpid::framing::FieldTable& ();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBADAPTER_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BlobEncoder.h"
+#include <qpid/Exception.h>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <boost/intrusive_ptr.hpp>
+#include <memory.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+template <class ITEM> void
+BlobEncoder::encode(const ITEM &item)
+{
+    SAFEARRAYBOUND bound[1] = {0, 0};
+    bound[0].cElements = item.encodedSize();
+    blob = SafeArrayCreate(VT_UI1, 1, bound);
+    if (S_OK != SafeArrayLock(blob)) {
+        SafeArrayDestroy(blob);
+        blob = 0;
+        throw qpid::Exception("Error locking blob area for persistable item");
+    }
+    try {
+        qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+        item.encode(buff);
+    }
+    catch(...) {
+        SafeArrayUnlock(blob);
+        SafeArrayDestroy(blob);
+        blob = 0;
+        throw;
+    }
+    this->vt = VT_ARRAY | VT_UI1;
+    this->parray = blob;
+    SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &item)
+{
+    // NOTE! If this code changes, verify the recovery code in MessageRecordset
+    SAFEARRAYBOUND bound[1] = {0, 0};
+    bound[0].cElements = item->encodedSize() + sizeof(uint32_t);
+    blob = SafeArrayCreate(VT_UI1, 1, bound);
+    if (S_OK != SafeArrayLock(blob)) {
+        SafeArrayDestroy(blob);
+        blob = 0;
+        throw qpid::Exception("Error locking blob area for message");
+    }
+    try {
+        uint32_t headerSize = item->encodedHeaderSize();
+        qpid::framing::Buffer buff((char *)blob->pvData, bound[0].cElements);
+        buff.putLong(headerSize);
+        item->encode(buff);
+    }
+    catch(...) {
+        SafeArrayUnlock(blob);
+        SafeArrayDestroy(blob);
+        blob = 0;
+        throw;
+    }
+    this->vt = VT_ARRAY | VT_UI1;
+    this->parray = blob;
+    SafeArrayUnlock(blob);
+}
+
+template <> void
+BlobEncoder::encode(const std::string &item)
+{
+    SAFEARRAYBOUND bound[1] = {0, 0};
+    bound[0].cElements = item.size();
+    blob = SafeArrayCreate(VT_UI1, 1, bound);
+    if (S_OK != SafeArrayLock(blob)) {
+        SafeArrayDestroy(blob);
+        blob = 0;
+        throw qpid::Exception("Error locking blob area for string");
+    }
+    memcpy_s(blob->pvData, item.size(), item.data(), item.size());
+    this->vt = VT_ARRAY | VT_UI1;
+    this->parray = blob;
+    SafeArrayUnlock(blob);
+}
+
+BlobEncoder::BlobEncoder(const qpid::broker::Persistable &item) : blob(0)
+{
+    encode(item);
+}
+
+BlobEncoder::BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg) : blob(0)
+{
+    encode(msg);
+}
+
+BlobEncoder::BlobEncoder(const qpid::framing::FieldTable &fields) : blob(0)
+{
+    encode(fields);
+}
+
+BlobEncoder::BlobEncoder(const std::string &data) : blob(0)
+{
+    encode(data);
+}
+
+BlobEncoder::~BlobEncoder()
+{
+    if (blob)
+        SafeArrayDestroy(blob);
+    blob = 0;
+    this->parray = 0;
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobEncoder.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,61 @@
+#ifndef QPID_STORE_MSSQL_BLOBENCODER_H
+#define QPID_STORE_MSSQL_BLOBENCODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <comutil.h>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/Persistable.h>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobEncoder
+ *
+ * Encodes a blob (varbinary) field from a qpid::broker::Persistable or a
+ * qpid::framing::FieldTable (both of which can be encoded to
+ * qpid::framing::Buffer) so it can be passed to ADO methods for writing
+ * to the database.
+ */
+class BlobEncoder : public _variant_t {
+private:
+    SAFEARRAY *blob;
+
+    template <class ITEM> void encode(const ITEM &item);
+
+public:
+    BlobEncoder(const qpid::broker::Persistable &item);
+    BlobEncoder(const boost::intrusive_ptr<qpid::broker::PersistableMessage> &msg);
+    BlobEncoder(const qpid::framing::FieldTable &fields);
+    BlobEncoder(const std::string& data);
+    ~BlobEncoder();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBENCODER_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <qpid/log/Statement.h>
+
+#include "BlobRecordset.h"
+#include "BlobEncoder.h"
+#include "VariantHelper.h"
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+void
+BlobRecordset::remove(uint64_t id)
+{
+    // Look up the item by its persistenceId
+    std::ostringstream filter;
+    filter << "persistenceId = " << id << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (!rs->EndOfFile) {
+        // Delete the record
+        rs->Delete(adAffectCurrent);
+        rs->Update();
+    }
+}
+
+void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+    BlobEncoder blob (item);   // Marshall item info to a blob
+    rs->AddNew();
+    item.setPersistenceId(rs->Fields->Item["persistenceId"]->Value);
+    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+    rs->Update();
+}
+
+void
+BlobRecordset::remove(const qpid::broker::Persistable& item)
+{
+    remove(item.getPersistenceId());
+}
+
+void
+BlobRecordset::dump()
+{
+    Recordset::dump();
+#if 1
+    if (rs->EndOfFile && rs->BOF)    // No records
+        return;
+
+    rs->MoveFirst();
+    while (!rs->EndOfFile) {
+        uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+        QPID_LOG(notice, "  -> " << id);
+        rs->MoveNext();
+    }
+#else
+    for (Iterator iter = begin(); iter != end(); ++iter) {
+        uint64_t id = *iter.first;
+        QPID_LOG(notice, "  -> " << id);
+    }
+#endif
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,53 @@
+#ifndef QPID_STORE_MSSQL_BLOBRECORDSET_H
+#define QPID_STORE_MSSQL_BLOBRECORDSET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Recordset.h"
+#include <qpid/broker/Persistable.h>
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class BlobRecordset
+ *
+ * Class for the "blob" records that record an id, varbinary(max) pair.
+ */
+class BlobRecordset : public Recordset {
+protected:
+    // Remove a record given its Id.
+    void remove(uint64_t id);
+
+public:
+    void add(const qpid::broker::Persistable& item);
+    void remove(const qpid::broker::Persistable& item);
+
+    // Dump table contents; useful for debugging.
+    void dump();
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_BLOBRECORDSET_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.cpp Wed Oct 28 15:36:39 2009
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+}
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+DatabaseConnection::DatabaseConnection() : conn(0)
+{
+}
+
+DatabaseConnection::~DatabaseConnection()
+{
+    close();
+}
+
+void
+DatabaseConnection::open(const std::string& connectString,
+                         const std::string& dbName)
+{
+    if (conn && conn->State == adStateOpen)
+        return;
+    std::string adoConnect = "Provider=SQLOLEDB;" + connectString;
+    try {
+        TESTHR(conn.CreateInstance(__uuidof(Connection)));
+        conn->ConnectionString = adoConnect.c_str();
+        conn->Open("", "", "", adConnectUnspecified);
+        if (dbName.length() > 0)
+            conn->DefaultDatabase = dbName.c_str();
+    }
+    catch(_com_error &e) {
+        close();
+        throw ADOException("MSSQL can't open " + dbName + " at " + adoConnect, e);
+    }
+}
+
+void
+DatabaseConnection::close()
+{
+    if (conn && conn->State == adStateOpen)
+        conn->Close();
+    conn = 0;
+}
+
+}}}  // namespace qpid::store::ms_sql

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/DatabaseConnection.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,62 @@
+#ifndef QPID_STORE_MSSQL_DATABASECONNECTION_H
+#define QPID_STORE_MSSQL_DATABASECONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+        no_namespace rename("EOF", "EndOfFile")
+
+#include <string>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+/**
+ * @class DatabaseConnection
+ *
+ * Represents a connection to the SQL database. This class wraps the
+ * needed _ConnectionPtr for ADO as well as the needed COM initialization
+ * and cleanup that each thread requires. It is expected that this class
+ * will be maintained in thread-specific storage so it has no locks.
+ */
+class DatabaseConnection {
+protected:
+    _ConnectionPtr conn;
+
+public:
+    DatabaseConnection();
+    ~DatabaseConnection();
+    void open(const std::string& connectString,
+              const std::string& dbName = "");
+    void close();
+    operator _ConnectionPtr () { return conn; }
+
+    void beginTransaction() { conn->BeginTrans(); }
+    void commitTransaction() {conn->CommitTrans(); }
+    void rollbackTransaction() { conn->RollbackTrans(); }
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_DATABASECONNECTION_H */

Added: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Exception.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Exception.h?rev=830613&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Exception.h (added)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/store/ms-sql/Exception.h Wed Oct 28 15:36:39 2009
@@ -0,0 +1,56 @@
+#ifndef QPID_STORE_MSSQL_EXCEPTION_H
+#define QPID_STORE_MSSQL_EXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <comdef.h>
+#include <qpid/store/StorageProvider.h>
+
+namespace qpid {
+namespace store {
+namespace ms_sql {
+
+class Exception : public qpid::store::StorageProvider::Exception
+{
+protected:
+    std::string text;
+public:
+    Exception(const std::string& _text) : text(_text) {}
+    virtual ~Exception() {}
+    virtual const char* what() const throw() { return text.c_str(); }
+};
+
+class ADOException : public Exception
+{
+public:
+    ADOException(const std::string& _text, _com_error &e)
+      : Exception(_text) {
+        text += ": ";
+        _bstr_t wmsg = e.Description();
+        text += (const char *)wmsg;
+    }
+};
+
+}}}  // namespace qpid::store::ms_sql
+
+#endif /* QPID_STORE_MSSQL_EXCEPTION_H */



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org