You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/10/22 01:09:01 UTC

svn commit: r1026175 [1/2] - in /qpid/trunk/qpid/cpp/src/qpid/store: ./ ms-clfs/ ms-sql/

Author: shuston
Date: Thu Oct 21 23:09:00 2010
New Revision: 1026175

URL: http://svn.apache.org/viewvc?rev=1026175&view=rev
Log:
Add hybrid SQL-CLFS store that can be used on Windows systems Vista, Win Server 2008 and up - these have the Common Log File System facility. The CLFS store uses CLFS to store/recover messages and transactions instead of SQL. SQL still holds queues, bindings, exchanges, configs.

Added:
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.h
    qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=1026175&r1=1026174&r2=1026175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt Thu Oct 21 23:09:00 2010
@@ -80,3 +80,32 @@ if (BUILD_MSSQL)
            DESTINATION ${QPIDD_MODULE_DIR}
            COMPONENT ${QPID_COMPONENT_BROKER})
 endif (BUILD_MSSQL)
+
+# Build the MS SQL-CLFS Storage Provider plugin
+set (msclfs_default ON)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+  set(msclfs_default OFF)
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+option(BUILD_MSCLFS "Build MS hybrid SQL-CLFS Store provider plugin" ${msclfs_default})
+if (BUILD_MSCLFS)
+  add_library (msclfs_store MODULE
+               ms-clfs/MsSqlClfsProvider.cpp
+               ms-clfs/Log.cpp
+               ms-clfs/MessageLog.cpp
+               ms-clfs/Messages.cpp
+               ms-clfs/Transaction.cpp
+               ms-clfs/TransactionLog.cpp
+               ms-sql/BindingRecordset.cpp
+               ms-sql/BlobAdapter.cpp
+               ms-sql/BlobEncoder.cpp
+               ms-sql/BlobRecordset.cpp
+               ms-sql/DatabaseConnection.cpp
+               ms-sql/Recordset.cpp
+               ms-sql/State.cpp
+               ms-sql/VariantHelper.cpp)
+  include_directories(ms-sql)
+  target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib)
+  install (TARGETS msclfs_store # RUNTIME
+           DESTINATION ${QPIDD_MODULE_DIR}
+           COMPONENT ${QPID_COMPONENT_BROKER})
+endif (BUILD_MSCLFS)

Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1026175&r1=1026174&r2=1026175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Thu Oct 21 23:09:00 2010
@@ -61,11 +61,13 @@ MessageStorePlugin::StoreOptions::StoreO
 void
 MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
 {
-    qpid::broker::Broker* broker =
+    qpid::broker::Broker* b =
         dynamic_cast<qpid::broker::Broker*>(&target);
-    if (0 == broker)
+    if (0 == b)
         return;        // Only listen to Broker targets
 
+    broker = b;
+
     // See if there are any storage provider plugins ready. If not, we can't
     // do a message store.
     qpid::Plugin::earlyInitAll(*this);
@@ -412,8 +414,12 @@ MessageStorePlugin::recover(broker::Reco
     provider->second->recoverExchanges(recoverer, exchanges);
     provider->second->recoverQueues(recoverer, queues);
     provider->second->recoverBindings(recoverer, exchanges, queues);
-    provider->second->recoverTransactions(recoverer, dtxMap);
+    // Important to recover messages before transactions in the SQL-CLFS
+    // case. If this becomes a problem, it may be possible to resolve it.
+    // If in doubt please raise a jira and notify Steve Huston
+    // <sh...@riverace.com>.
     provider->second->recoverMessages(recoverer, messages, messageQueueMap);
+    provider->second->recoverTransactions(recoverer, dtxMap);
     // Enqueue msgs where needed.
     for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
          i != messageQueueMap.end();

Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.h?rev=1026175&r1=1026174&r2=1026175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.h Thu Oct 21 23:09:00 2010
@@ -57,7 +57,7 @@ class MessageStorePlugin :
     // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info?    public qpid::management::Manageable
 {
   public:
-    MessageStorePlugin() {}
+    MessageStorePlugin() : broker(0) {}
 
     /**
      * @name Methods inherited from qpid::Plugin
@@ -264,6 +264,9 @@ class MessageStorePlugin :
     //    inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&)
     //        { return management::Manageable::STATUS_OK; }
 
+    // So storage provider can get the broker info.
+    broker::Broker *getBroker() { return broker; }
+
   protected:
 
     struct StoreOptions : public qpid::Options {
@@ -276,6 +279,8 @@ class MessageStorePlugin :
     ProviderMap providers;
     ProviderMap::const_iterator provider;
 
+    broker::Broker *broker;
+
 }; // class MessageStoreImpl
 
 }} // namespace qpid::store

Modified: qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h?rev=1026175&r1=1026174&r2=1026175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h Thu Oct 21 23:09:00 2010
@@ -50,6 +50,15 @@ struct QueueEntry {
     uint64_t queueId;
     TplStatus tplStatus;
     std::string xid;
+
+    QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
+        : queueId(id), tplStatus(tpl), xid(x) {}
+
+    bool operator==(const QueueEntry& rhs) {
+        if (queueId != rhs.queueId) return false;
+        if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
+        return xid == rhs.xid;
+    }
 };
 typedef std::map<uint64_t, std::vector<QueueEntry> > MessageQueueMap;
 typedef std::map<std::string, qpid::broker::RecoverableTransaction::shared_ptr>

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <stdlib.h>
+#include <qpid/sys/windows/check.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Log::~Log()
+{
+    if (marshal != 0)
+        ::DeleteLogMarshallingArea(marshal);
+    ::CloseHandle(handle);
+}
+
+void
+Log::open(const std::string& path, const TuningParameters& params)
+{
+    this->containerSize = static_cast<ULONGLONG>(params.containerSize);
+    logPath = path;
+    std::string logSpec = "log:" + path;
+    size_t specLength = logSpec.length();
+    wchar_t *wLogSpec = new wchar_t[specLength + 1];
+    size_t converted;
+    mbstowcs_s(&converted, wLogSpec, specLength+1, logSpec.c_str(), specLength);
+    wLogSpec[converted] = L'\0';
+    handle = ::CreateLogFile(wLogSpec,
+                             GENERIC_WRITE | GENERIC_READ,
+                             0,
+                             0,
+                             OPEN_ALWAYS,
+                             0);
+    QPID_WINDOWS_CHECK_NOT(handle, INVALID_HANDLE_VALUE);
+    CLFS_INFORMATION info;
+    ULONG infoSize = sizeof(info);
+    BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+    if (info.TotalContainers == 0) {
+        std::vector<const std::wstring> paths;
+        LPWSTR cPaths[1024];
+        size_t pathLength = logPath.length();
+        wchar_t *wLogPath = new wchar_t[pathLength + 1];
+        mbstowcs_s(&converted, wLogPath, pathLength+1,
+                   logPath.c_str(), pathLength);
+        wLogPath[converted] = L'\0';
+        for (unsigned short i = 0; i < params.containers && i < 1024; ++i) {
+            std::wostringstream path;
+            path << wLogPath << L"-container-" << i << std::ends;
+            paths.push_back(path.str ());
+            cPaths[i] = const_cast<LPWSTR>(paths[i].c_str());
+        }
+        ok = ::AddLogContainerSet(handle,
+                                  params.containers,
+                                  &this->containerSize,
+                                  cPaths,
+                                  NULL);
+        QPID_WINDOWS_CHECK_NOT(ok, 0);
+    }
+    // Need a marshaling area
+    ok = ::CreateLogMarshallingArea(handle,
+                                    NULL, NULL, NULL,    // Alloc, free, context
+                                    marshallingBufferSize(),
+                                    params.maxWriteBuffers,
+                                    1,                   // Max read buffers
+                                    &marshal);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+}
+
+uint32_t
+Log::marshallingBufferSize()
+{
+  // Default implementation returns the minimum marshalling buffer size;
+  // derived ones should come up with a more fitting value.
+  //
+  // Find the directory name part of the log specification, including the
+  // trailing '\'.
+  size_t dirMarker = logPath.rfind('\\');
+  if (dirMarker == std::string::npos)
+      dirMarker = logPath.rfind('/');
+  DWORD bytesPerSector;
+  DWORD dontCare;
+  ::GetDiskFreeSpace(logPath.substr(0, dirMarker).c_str(),
+                     &dontCare,
+                     &bytesPerSector,
+                     &dontCare,
+                     &dontCare);
+  return bytesPerSector;
+}
+
+CLFS_LSN
+Log::write(void* entry, uint32_t length, CLFS_LSN* prev)
+{
+    CLFS_WRITE_ENTRY desc;
+    desc.Buffer = entry;
+    desc.ByteLength = length;
+    CLFS_LSN lsn;
+    BOOL ok = ::ReserveAndAppendLog(marshal,
+                                    &desc, 1,            // Buffer descriptor
+                                    0, prev,             // Undo-Next, Prev
+                                    0, 0,                // Reservation
+                                    CLFS_FLAG_FORCE_FLUSH,                   // CLFS_FLAGS_NO_FLAGS
+                                    &lsn,
+                                    0);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+    return lsn;
+}
+
+// Get the current base LSN of the log.
+CLFS_LSN
+Log::getBase()
+{
+    CLFS_INFORMATION info;
+    ULONG infoSize = sizeof(info);
+    BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+    return info.BaseLsn;
+}
+
+void
+Log::moveTail(const CLFS_LSN& oldest)
+{
+    BOOL ok = ::AdvanceLogBase(marshal,
+                               const_cast<PCLFS_LSN>(&oldest),
+                               0, NULL);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+}
+
+}}}  // namespace qpid::store::ms_clfs

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,72 @@
+#ifndef QPID_STORE_MSCLFS_LOG_H
+#define QPID_STORE_MSCLFS_LOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class Log
+ *
+ * Represents a CLFS-housed log.
+ */
+class Log {
+
+protected:
+    HANDLE handle;
+    ULONGLONG containerSize;
+    std::string logPath;
+    PVOID marshal;
+
+public:
+    struct TuningParameters {
+        size_t containerSize;
+        unsigned short containers;
+        uint32_t maxWriteBuffers;
+    };
+
+    Log() : handle(INVALID_HANDLE_VALUE), containerSize(0), marshal(0) {}
+    virtual ~Log();
+
+    void open(const std::string& path, const TuningParameters& params);
+
+    virtual uint32_t marshallingBufferSize();
+
+    CLFS_LSN write(void* entry, uint32_t length, CLFS_LSN* prev = 0);
+
+    // Get the current base LSN of the log.
+    CLFS_LSN getBase();
+
+    // Move the log tail to the indicated LSN.
+    void moveTail(const CLFS_LSN& oldest);
+};
+
+}}}  // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_LOG_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,36 @@
+#ifndef QPID_STORE_MSCLFS_LSN_H
+#define QPID_STORE_MSCLFS_LSN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <clfsw32.h>
+
+namespace {
+    // Make it easy to assign LSNs
+    inline CLFS_LSN idToLsn(const uint64_t val)
+    { CLFS_LSN lsn; lsn.Internal = val; return lsn; }
+
+    inline uint64_t lsnToId(const CLFS_LSN& lsn)
+    { uint64_t val = lsn.Internal; return val; }
+}
+
+#endif /* QPID_STORE_MSCLFS_LSN_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,1180 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+#include <map>
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StoreException.h>
+#include <qpid/store/StorageProvider.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+
+// From ms-sql...
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+using qpid::store::ms_sql::BlobAdapter;
+using qpid::store::ms_sql::BlobRecordset;
+using qpid::store::ms_sql::BindingRecordset;
+using qpid::store::ms_sql::DatabaseConnection;
+using qpid::store::ms_sql::ADOException;
+using qpid::store::ms_sql::State;
+using qpid::store::ms_sql::VariantHelper;
+
+#include "Log.h"
+#include "Messages.h"
+#include "Transaction.h"
+#include "TransactionLog.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+        no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblQueue("tblQueue");
+
+/*
+ * Maintain a map of id -> QueueContents. RWlock protecting the map allows
+ * concurrent reads so multiple threads can get access to the needed queue;
+ * queue lock protects the QueueContents themselves.
+ */
+struct QueueContents {
+    typedef boost::shared_ptr<QueueContents> shared_ptr;
+    qpid::sys::Mutex lock;
+    std::list<uint64_t> messages;
+};
+
+typedef std::map<uint64_t, QueueContents::shared_ptr> QueuesMap;
+qpid::sys::RWlock queuesLock;
+QueuesMap queues;
+
+}
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MSSqlClfsProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses a hybrid Microsoft
+ * SQL Server and Windows CLFS approach as the backend data store for Qpid.
+ */
+class MSSqlClfsProvider : public qpid::store::StorageProvider
+{
+protected:
+    void finalizeMe();
+
+    void dump();
+
+public:
+    MSSqlClfsProvider();
+    ~MSSqlClfsProvider();
+
+    virtual qpid::Options* getOptions() { return &options; }
+
+    virtual void earlyInitialize (Plugin::Target& target);
+    virtual void initialize(Plugin::Target& target);
+
+    /**
+     * Receive notification that this provider is the one that will actively
+     * handle provider storage for the target. If the provider is to be used,
+     * this method will be called after earlyInitialize() and before any
+     * recovery operations (recovery, in turn, precedes call to initialize()).
+     */
+    virtual void activate(MessageStorePlugin &store);
+
+    /**
+     * @name Methods inherited from qpid::broker::MessageStore
+     */
+    //@{
+    /**
+     * If called after init() but before recovery, will discard the database
+     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+     * is true, the content of the store dir will be moved to a backup dir
+     * inside the store dir. This is used when cluster nodes recover and must
+     * get their content from a cluster sync rather than directly from the
+     * store.
+     *
+     * @param pushDownStoreFiles If true, will move content of the store dir
+     *                           into a subdir, leaving the store dir
+     *                           otherwise empty.
+     */
+    virtual void truncateInit(const bool pushDownStoreFiles = false);
+
+    /**
+     * Record the existence of a durable queue
+     */
+    virtual void create(PersistableQueue& queue,
+                        const qpid::framing::FieldTable& args);
+    /**
+     * Destroy a durable queue
+     */
+    virtual void destroy(PersistableQueue& queue);
+
+    /**
+     * Record the existence of a durable exchange
+     */
+    virtual void create(const PersistableExchange& exchange,
+                        const qpid::framing::FieldTable& args);
+    /**
+     * Destroy a durable exchange
+     */
+    virtual void destroy(const PersistableExchange& exchange);
+
+    /**
+     * Record a binding
+     */
+    virtual void bind(const PersistableExchange& exchange,
+                      const PersistableQueue& queue,
+                      const std::string& key,
+                      const qpid::framing::FieldTable& args);
+
+    /**
+     * Forget a binding
+     */
+    virtual void unbind(const PersistableExchange& exchange,
+                        const PersistableQueue& queue,
+                        const std::string& key,
+                        const qpid::framing::FieldTable& args);
+
+    /**
+     * Record generic durable configuration
+     */
+    virtual void create(const PersistableConfig& config);
+
+    /**
+     * Destroy generic durable configuration
+     */
+    virtual void destroy(const PersistableConfig& config);
+
+    /**
+     * Stores a messages before it has been enqueued
+     * (enqueueing automatically stores the message so this is
+     * only required if storage is required prior to that
+     * point). If the message has not yet been stored it will
+     * store the headers as well as any content passed in. A
+     * persistence id will be set on the message which can be
+     * used to load the content or to append to it.
+     */
+    virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+    /**
+     * Destroys a previously staged message. This only needs
+     * to be called if the message is never enqueued. (Once
+     * enqueued, deletion will be automatic when the message
+     * is dequeued from all queues it was enqueued onto).
+     */
+    virtual void destroy(PersistableMessage& msg);
+
+    /**
+     * Appends content to a previously staged message
+     */
+    virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+                               const std::string& data);
+
+    /**
+     * Loads (a section) of content data for the specified
+     * message (previously stored through a call to stage or
+     * enqueue) into data. The offset refers to the content
+     * only (i.e. an offset of 0 implies that the start of the
+     * content should be loaded, not the headers or related
+     * meta-data).
+     */
+    virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+                             const boost::intrusive_ptr<const PersistableMessage>& msg,
+                             std::string& data,
+                             uint64_t offset,
+                             uint32_t length);
+
+    /**
+     * Enqueues a message, storing the message if it has not
+     * been previously stored and recording that the given
+     * message is on the given queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to enqueue
+     * @param queue the name of the queue onto which it is to be enqueued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
+
+    /**
+     * Dequeues a message, recording that the given message is
+     * no longer on the given queue and deleting the message
+     * if it is no longer on any other queue.
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     *
+     * @param msg the message to dequeue
+     * @param queue the name of the queue from which it is to be dequeued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+                         const boost::intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
+
+    /**
+     * Flushes all async messages to disk for the specified queue
+     *
+     * Note: this is a no-op for this provider.
+     *
+     * @param queue the name of the queue from which it is to be dequeued
+     */
+    virtual void flush(const PersistableQueue& queue) {};
+
+    /**
+     * Returns the number of outstanding AIO's for a given queue
+     *
+     * If 0, than all the enqueue / dequeues have been stored
+     * to disk
+     *
+     * @param queue the name of the queue to check for outstanding AIO
+     */
+    virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+        {return 0;}
+    //@}
+
+    /**
+     * @name Methods inherited from qpid::broker::TransactionalStore
+     */
+    //@{
+    virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+    virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+    virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+    virtual void commit(qpid::broker::TransactionContext& txn);
+    virtual void abort(qpid::broker::TransactionContext& txn);
+    virtual void collectPreparedXids(std::set<std::string>& xids);
+    //@}
+
+    virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+    virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+                                  ExchangeMap& exchangeMap);
+    virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+                               QueueMap& queueMap);
+    virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+                                 const ExchangeMap& exchangeMap,
+                                 const QueueMap& queueMap);
+    virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+                                 MessageMap& messageMap,
+                                 MessageQueueMap& messageQueueMap);
+    virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+                                     PreparedTransactionMap& dtxMap);
+
+private:
+    struct ProviderOptions : public qpid::Options
+    {
+        std::string connectString;
+        std::string catalogName;
+        std::string storeDir;
+        size_t containerSize;
+        unsigned short initialContainers;
+        uint32_t maxWriteBuffers;
+
+        ProviderOptions(const std::string &name)
+            : qpid::Options(name),
+              catalogName("QpidStore"),
+              containerSize(1024 * 1024),
+              initialContainers(2),
+              maxWriteBuffers(10)
+        {
+            const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+            TCHAR myName[NAMELEN];
+            DWORD myNameLen = NAMELEN;
+            GetComputerName(myName, &myNameLen);
+            connectString = "Data Source=";
+            connectString += myName;
+            connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+            addOptions()
+                ("connect",
+                 qpid::optValue(connectString, "STRING"),
+                 "Connection string for the database to use. Will prepend "
+                 "Provider=SQLOLEDB;")
+                ("catalog",
+                 qpid::optValue(catalogName, "DB NAME"),
+                 "Catalog (database) name")
+                ("store-dir",
+                 qpid::optValue(storeDir, "DIR"),
+                 "Location to store message and transaction data "
+                 "(default uses data-dir if available)")
+                ("container-size",
+                 qpid::optValue(containerSize, "VALUE"),
+                 "Bytes per container; min 512K. Only used when creating "
+                 "a new log")
+                ("initial-containers",
+                 qpid::optValue(initialContainers, "VALUE"),
+                 "Number of containers to add if creating a new log")
+                ("max-write-buffers",
+                 qpid::optValue(maxWriteBuffers, "VALUE"),
+                 "Maximum write buffers outstanding before log is flushed "
+                 "(0 means no limit)")
+                ;
+        }
+    };
+    ProviderOptions options;
+    std::string brokerDataDir;
+    Messages messages;
+    // TransactionLog requires itself to have a shared_ptr reference to start.
+    TransactionLog::shared_ptr transactions;
+
+    // Each thread has a separate connection to the database and also needs
+    // to manage its COM initialize/finalize individually. This is done by
+    // keeping a thread-specific State.
+    boost::thread_specific_ptr<State> dbState;
+
+    State *initState();
+    DatabaseConnection *initConnection(void);
+    void createDb(DatabaseConnection *db, const std::string &name);
+    void createLogs();
+};
+
+static MSSqlClfsProvider static_instance_registers_plugin;
+
+void
+MSSqlClfsProvider::finalizeMe()
+{
+    dbState.reset();
+}
+
+MSSqlClfsProvider::MSSqlClfsProvider()
+    : options("MS SQL/CLFS Provider options")
+{
+    transactions = boost::make_shared<TransactionLog>();
+}
+
+MSSqlClfsProvider::~MSSqlClfsProvider()
+{
+}
+
+void
+MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
+{
+    MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+    if (store) {
+        // Check the store dir option; if not specified, need to
+        // grab the broker's data dir.
+        if (options.storeDir.empty()) {
+            DataDir& dir = store->getBroker()->getDataDir();
+            if (dir.isEnabled()) {
+                options.storeDir = dir.getPath();
+            }
+            else {
+                QPID_LOG(error,
+                         "MSSQL-CLFS: --store-dir required if --no-data-dir specified");
+                return;
+            }
+        }
+
+        // If CLFS is not available on this system, give up now.
+        try {
+            Log::TuningParameters params;
+            params.containerSize = options.containerSize;
+            params.containers = options.initialContainers;
+            params.maxWriteBuffers = options.maxWriteBuffers;
+            std::string msgPath = options.storeDir + "\\" + "messages";
+            messages.openLog(msgPath, params);
+            std::string transPath = options.storeDir + "\\" + "transactions";
+            transactions->open(transPath, params);
+        }
+        catch (std::exception &e) {
+            QPID_LOG(error, e.what());
+            return;
+        }
+
+        // If the database init fails, report it and don't register; give
+        // the rest of the broker a chance to run.
+        //
+        // Don't try to initConnection() since that will fail if the
+        // database doesn't exist. Instead, try to open a connection without
+        // a database name, then search for the database. There's still a
+        // chance this provider won't be selected for the store too, so be
+        // be sure to close the database connection before return to avoid
+        // leaving a connection up that will not be used.
+        try {
+            initState();     // This initializes COM
+            std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+            db->open(options.connectString, "");
+            _ConnectionPtr conn(*db);
+            _RecordsetPtr pCatalogs = NULL;
+            VariantHelper<std::string> catalogName(options.catalogName);
+            pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+            if (pCatalogs->EndOfFile) {
+                // Database doesn't exist; create it
+                QPID_LOG(notice,
+                         "MSSQL-CLFS: Creating database " + options.catalogName);
+                createDb(db.get(), options.catalogName);
+            }
+            else {
+                QPID_LOG(notice,
+                         "MSSQL-CLFS: Database located: " + options.catalogName);
+            }
+            if (pCatalogs) {
+                if (pCatalogs->State == adStateOpen)
+                    pCatalogs->Close();
+                pCatalogs = 0;
+            }
+            db->close();
+            store->providerAvailable("MSSQL-CLFS", this);
+        }
+        catch (qpid::Exception &e) {
+            QPID_LOG(error, e.what());
+            return;
+        }
+        store->addFinalizer(boost::bind(&MSSqlClfsProvider::finalizeMe, this));
+    }
+}
+
+void
+MSSqlClfsProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlClfsProvider::activate(MessageStorePlugin &store)
+{
+    QPID_LOG(info, "MS SQL/CLFS Provider is up");
+}
+
+void
+MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
+{
+}
+
+void
+MSSqlClfsProvider::create(PersistableQueue& queue,
+                          const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    try {
+        db->beginTransaction();
+        rsQueues.open(db, TblQueue);
+        rsQueues.add(queue);
+        {
+            // Db stuff ok so far; add an empty QueueContents for the queue.
+            QueueContents::shared_ptr entry(new QueueContents);
+            qpid::sys::ScopedWlock<qpid::sys::RWlock> l(queuesLock);
+            queues[queue.getPersistenceId()] = entry;
+        }
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error creating queue " + queue.getName(), e, errs);
+    }
+    catch(std::exception& e) {
+        db->rollbackTransaction();
+        THROW_STORE_EXCEPTION(e.what());
+    }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlClfsProvider::destroy(PersistableQueue& queue)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    BindingRecordset rsBindings;
+    try {
+        db->beginTransaction();
+        rsQueues.open(db, TblQueue);
+        rsBindings.open(db, TblBinding);
+        // Remove bindings first; the queue IDs can't be ripped out from
+        // under the references in the bindings table.
+        rsBindings.removeForQueue(queue.getPersistenceId());
+        rsQueues.remove(queue);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error deleting queue " + queue.getName(), e, errs);
+    }
+
+    /*
+     * Now that the SQL stuff has recorded the queue deletion, reflect
+     * all the dequeued messages in memory. Don't worry about any errors
+     * that occur while reflecting these in the log because:
+     *   - If we have to recover from this point (or anywhere from here
+     *     until all messages are dequeued) there's no valid queue ID
+     *     from the Enqueue record, so recovery will throw it out anyway.
+     *   - If there is a failure before the SQL changes commit, the
+     *     existing Enqueue records will replace the message on the
+     *     queue during recovery.
+     * so, the best we could do by logging these dequeue operations is
+     * record something that will need to be ignored during recovery.
+     *
+     * Obtain a write lock to the queue map. Doing so gets this thread
+     * exclusive access to the queue map. This means no other thread can
+     * come while we're holding it and access, even for read, the list.
+     * However, there may already be other previously obtained references
+     * to the queue's message list outstanding, so also get the queue's
+     * list lock to serialize with any other threads. We should be able
+     * to count on the broker not making the destroy() call while other
+     * uses of the queue are outstanding, but play it safe.
+     */
+    std::list<uint64_t> affectedMessages;
+    uint64_t qId = queue.getPersistenceId();
+    {
+        ::qpid::sys::RWlock::ScopedWlock l(queuesLock);
+        QueueContents::shared_ptr q = queues[qId];
+        {
+            ::qpid::sys::Mutex::ScopedLock ql(q->lock);
+            affectedMessages = q->messages;
+        }
+        queues.erase(queues.find(qId));
+    }
+    // Now tell each of the messages they are less one queue commitment.
+    // Can I call dequeue()? Or some sub-piece of that?
+    Transaction::shared_ptr nonTransactional;
+    BOOST_FOREACH(uint64_t msgId, affectedMessages) {
+      messages.dequeue(msgId, qId, nonTransactional);
+    }
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlClfsProvider::create(const PersistableExchange& exchange,
+                          const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
+    try {
+        db->beginTransaction();
+        rsExchanges.open(db, TblExchange);
+        rsExchanges.add(exchange);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error creating exchange " + exchange.getName(),
+                           e,
+                           errs);
+    }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableExchange& exchange)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
+    BindingRecordset rsBindings;
+    try {
+        db->beginTransaction();
+        rsExchanges.open(db, TblExchange);
+        rsBindings.open(db, TblBinding);
+        // Remove bindings first; the exchange IDs can't be ripped out from
+        // under the references in the bindings table.
+        rsBindings.removeForExchange(exchange.getPersistenceId());
+        rsExchanges.remove(exchange);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error deleting exchange " + exchange.getName(),
+                           e,
+                           errs);
+    }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlClfsProvider::bind(const PersistableExchange& exchange,
+                        const PersistableQueue& queue,
+                        const std::string& key,
+                        const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
+    try {
+        db->beginTransaction();
+        rsBindings.open(db, TblBinding);
+        rsBindings.add(exchange.getPersistenceId(),
+                       queue.getPersistenceId(),
+                       key,
+                       args);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error binding exchange " + exchange.getName() +
+                           " to queue " + queue.getName(),
+                           e,
+                           errs);
+    }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlClfsProvider::unbind(const PersistableExchange& exchange,
+                          const PersistableQueue& queue,
+                          const std::string& key,
+                          const qpid::framing::FieldTable& args)
+{
+    DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
+    try {
+        db->beginTransaction();
+        rsBindings.open(db, TblBinding);
+        rsBindings.remove(exchange.getPersistenceId(),
+                          queue.getPersistenceId(),
+                          key,
+                          args);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error unbinding exchange " + exchange.getName() +
+                           " from queue " + queue.getName(),
+                           e,
+                           errs);
+    }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlClfsProvider::create(const PersistableConfig& config)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
+    try {
+        db->beginTransaction();
+        rsConfigs.open(db, TblConfig);
+        rsConfigs.add(config);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error creating config " + config.getName(), e, errs);
+    }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableConfig& config)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
+    try {
+        db->beginTransaction();
+        rsConfigs.open(db, TblConfig);
+        rsConfigs.remove(config);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error deleting config " + config.getName(), e, errs);
+    }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlClfsProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+#if 0
+    DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
+    try {
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.add(msg);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error staging message", e, errs);
+    }
+#endif
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlClfsProvider::destroy(PersistableMessage& msg)
+{
+#if 0
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsMessages;
+    try {
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.remove(msg);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error deleting message", e, errs);
+    }
+#endif
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlClfsProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+                                 const std::string& data)
+{
+#if 0
+    DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
+    try {
+        db->beginTransaction();
+        rsMessages.open(db, TblMessage);
+        rsMessages.append(msg, data);
+        db->commitTransaction();
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        db->rollbackTransaction();
+        throw ADOException("Error appending to message", e, errs);
+    }  
+#endif
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+                               const boost::intrusive_ptr<const PersistableMessage>& msg,
+                               std::string& data,
+                               uint64_t offset,
+                               uint32_t length)
+{
+#if 0
+    // SQL store keeps all messages in one table, so we don't need the
+    // queue reference.
+    DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
+    try {
+        rsMessages.open(db, TblMessage);
+        rsMessages.loadContent(msg, data, offset, length);
+    }
+    catch(_com_error &e) {
+        std::string errs = db->getErrors();
+        throw ADOException("Error loading message content", e, errs);
+    }
+#endif
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+                           const boost::intrusive_ptr<PersistableMessage>& msg,
+                           const PersistableQueue& queue)
+{
+    Transaction::shared_ptr t;
+    TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
+    if (ctx != 0)
+        t = ctx->getTransaction();
+    uint64_t qId = queue.getPersistenceId();
+    uint64_t msgId = msg->getPersistenceId();
+    QueueContents::shared_ptr q;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
+        QueuesMap::iterator i = queues.find(qId);
+        if (i == queues.end())
+            THROW_STORE_EXCEPTION("Queue does not exist");
+        q = i->second;
+    }
+    if (msgId == 0) {
+        messages.add(msg);
+        msgId = msg->getPersistenceId();
+    }
+    messages.enqueue(msgId, qId, t);
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
+        q->messages.push_back(msgId);
+    }
+    msg->enqueueComplete();
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+                           const boost::intrusive_ptr<PersistableMessage>& msg,
+                           const PersistableQueue& queue)
+{
+    Transaction::shared_ptr t;
+    TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
+    if (ctx != 0)
+        t = ctx->getTransaction();
+    uint64_t qId = queue.getPersistenceId();
+    uint64_t msgId = msg->getPersistenceId();
+    QueueContents::shared_ptr q;
+    {
+        qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock);
+        QueuesMap::const_iterator i = queues.find(qId);
+        if (i == queues.end())
+            THROW_STORE_EXCEPTION("Queue does not exist");
+        q = i->second;
+    }
+    messages.dequeue(msgId, qId, t);
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock);
+        q->messages.remove(msgId);
+    }
+    msg->dequeueComplete();
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlClfsProvider::begin()
+{
+    Transaction::shared_ptr t = transactions->begin();
+    std::auto_ptr<qpid::broker::TransactionContext> tc(new TransactionContext(t));
+    return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlClfsProvider::begin(const std::string& xid)
+{
+    TPCTransaction::shared_ptr t = transactions->begin(xid);
+    std::auto_ptr<qpid::broker::TPCTransactionContext> tc(new TPCTransactionContext(t));
+    return tc;
+}
+
+void
+MSSqlClfsProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+    TPCTransactionContext *ctx = dynamic_cast<TPCTransactionContext*> (&txn);
+    if (ctx == 0)
+        throw qpid::broker::InvalidTransactionContextException();
+    ctx->getTransaction()->prepare();
+}
+
+void
+MSSqlClfsProvider::commit(qpid::broker::TransactionContext& txn)
+{
+    Transaction::shared_ptr t;
+    TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+    if (ctx)
+        t = ctx->getTransaction();
+    else {
+        TPCTransactionContext *tctx;
+        tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+        if (tctx == 0)
+            throw qpid::broker::InvalidTransactionContextException();
+        t = tctx->getTransaction();
+    }
+    t->commit(messages);
+}
+
+void
+MSSqlClfsProvider::abort(qpid::broker::TransactionContext& txn)
+{
+    Transaction::shared_ptr t;
+    TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+    if (ctx)
+        t = ctx->getTransaction();
+    else {
+        TPCTransactionContext *tctx;
+        tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+        if (tctx == 0)
+            throw qpid::broker::InvalidTransactionContextException();
+        t = tctx->getTransaction();
+    }
+    t->abort(messages);
+}
+
+void
+MSSqlClfsProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+    std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+    transactions->collectPreparedXids(preparedMap);
+    std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+    for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+        xids.insert(i->first);
+    }
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlClfsProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
+    rsConfigs.open(db, TblConfig);
+    _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Config instance and reset its ID.
+        broker::RecoverableConfig::shared_ptr config =
+            recoverer.recoverConfig(blob);
+        config->setPersistenceId(id);
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlClfsProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+                                    ExchangeMap& exchangeMap)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
+    rsExchanges.open(db, TblExchange);
+    _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Exchange instance, reset its ID, and remember the
+        // ones restored for matching up when recovering bindings.
+        broker::RecoverableExchange::shared_ptr exchange =
+            recoverer.recoverExchange(blob);
+        exchange->setPersistenceId(id);
+        exchangeMap[id] = exchange;
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlClfsProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+                                 QueueMap& queueMap)
+{
+    DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    rsQueues.open(db, TblQueue);
+    _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+    if (p->BOF && p->EndOfFile)
+        return;   // Nothing to do
+    p->MoveFirst();
+    while (!p->EndOfFile) {
+        uint64_t id = p->Fields->Item["persistenceId"]->Value;
+        long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+        BlobAdapter blob(blobSize);
+        blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+        // Recreate the Queue instance and reset its ID.
+        broker::RecoverableQueue::shared_ptr queue =
+            recoverer.recoverQueue(blob);
+        queue->setPersistenceId(id);
+        queueMap[id] = queue;
+        QueueContents::shared_ptr entry(new QueueContents);
+        queues[id] = entry;
+        p->MoveNext();
+    }
+}
+
+void
+MSSqlClfsProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+                                   const ExchangeMap& exchangeMap,
+                                   const QueueMap& queueMap)
+{
+    DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
+    rsBindings.open(db, TblBinding);
+    rsBindings.recover(recoverer, exchangeMap, queueMap);
+}
+
+void
+MSSqlClfsProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+                                   MessageMap& messageMap,
+                                   MessageQueueMap& messageQueueMap)
+{
+    std::map<uint64_t, Transaction::shared_ptr> transMap;
+    transactions->recover(transMap);
+    messages.recover(recoverer, messageMap, messageQueueMap, transMap);
+}
+
+void
+MSSqlClfsProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+                                       PreparedTransactionMap& dtxMap)
+{
+    std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+    transactions->collectPreparedXids(preparedMap);
+    std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+    for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+        std::auto_ptr<TPCTransactionContext> ctx(new TPCTransactionContext(i->second));
+        std::auto_ptr<qpid::broker::TPCTransactionContext> brokerCtx(ctx);
+        dtxMap[i->first] = recoverer.recoverTransaction(i->first, brokerCtx);
+    }
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlClfsProvider::initState()
+{
+    State *state = dbState.get();   // See if thread has initialized
+    if (!state) {
+        state = new State;
+        dbState.reset(state);
+    }
+    return state;
+}
+  
+DatabaseConnection *
+MSSqlClfsProvider::initConnection(void)
+{
+    State *state = initState();
+    if (state->dbConn != 0)
+        return state->dbConn;    // And the DatabaseConnection is set up too
+    std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+    db->open(options.connectString, options.catalogName);
+    state->dbConn = db.release();
+    return state->dbConn;
+}
+
+void
+MSSqlClfsProvider::createDb(DatabaseConnection *db, const std::string &name)
+{
+    const std::string dbCmd = "CREATE DATABASE " + name;
+    const std::string useCmd = "USE " + name;
+    const std::string tableCmd = "CREATE TABLE ";
+    const std::string colSpecs =
+        " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+        "  fieldTableBlob varbinary(MAX) NOT NULL)";
+    const std::string bindingSpecs =
+        " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+        "  queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+        "  routingKey varchar(255),"
+        "  fieldTableBlob varbinary(MAX))";
+
+    _variant_t unused;
+    _bstr_t dbStr = dbCmd.c_str();
+    _ConnectionPtr conn(*db);
+    try {
+        conn->Execute(dbStr, &unused, adExecuteNoRecords);
+        _bstr_t useStr = useCmd.c_str();
+        conn->Execute(useStr, &unused, adExecuteNoRecords);
+        std::string makeTable = tableCmd + TblQueue + colSpecs;
+        _bstr_t makeTableStr = makeTable.c_str();
+        conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+        makeTable = tableCmd + TblExchange + colSpecs;
+        makeTableStr = makeTable.c_str();
+        conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+        makeTable = tableCmd + TblConfig + colSpecs;
+        makeTableStr = makeTable.c_str();
+        conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+        makeTable = tableCmd + TblBinding + bindingSpecs;
+        makeTableStr = makeTable.c_str();
+        conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+    }
+    catch(_com_error &e) {
+        throw ADOException("MSSQL can't create " + name, e, db->getErrors());
+    }
+}
+
+void
+MSSqlClfsProvider::dump()
+{
+  // dump all db records to qpid_log
+  QPID_LOG(notice, "DB Dump: (not dumping anything)");
+  //  rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp Thu Oct 21 23:09:00 2010
@@ -0,0 +1,365 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "MessageLog.h"
+#include "Lsn.h"
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum MessageEntryType {
+    MessageStartEntry           = 1,
+    MessageChunkEntry           = 2,
+    MessageDeleteEntry          = 3,
+    MessageEnqueueEntry         = 4,
+    MessageDequeueEntry         = 5
+};
+static const uint32_t MaxMessageContentLength = 64 * 1024;
+
+// Message-Start
+struct MessageStart {
+    MessageEntryType type;
+    // If the complete message encoding doesn't fit, remainder is in
+    // MessageChunk records to follow.
+    uint32_t totalLength;
+    uint32_t segmentLength;
+    char content[MaxMessageContentLength];
+
+    MessageStart()
+        : type(MessageStartEntry), totalLength(0), segmentLength(0) {}
+};
+// Message-Chunk
+struct MessageChunk {
+    MessageEntryType type;
+    uint32_t segmentLength;
+    char content[MaxMessageContentLength];
+
+    MessageChunk() : type(MessageChunkEntry), segmentLength(0) {}
+};
+// Message-Delete
+struct MessageDelete {
+    MessageEntryType type;
+
+    MessageDelete() : type(MessageDeleteEntry) {}
+};
+// Message-Enqueue
+struct MessageEnqueue {
+    MessageEntryType type;
+    uint64_t queueId;
+    uint64_t transId;
+
+    MessageEnqueue(uint64_t qId = 0, uint64_t tId = 0)
+        : type(MessageEnqueueEntry), queueId(qId), transId(tId) {}
+};
+// Message-Dequeue
+struct MessageDequeue {
+    MessageEntryType type;
+    uint64_t queueId;
+    uint64_t transId;
+
+    MessageDequeue(uint64_t qId = 0, uint64_t tId = 0)
+        : type(MessageDequeueEntry), queueId(qId), transId(tId) {}
+};
+
+}   // namespace
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+uint32_t
+MessageLog::marshallingBufferSize()
+{
+    size_t biggestNeed = std::max(sizeof(MessageStart), sizeof(MessageEnqueue));
+    uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+    uint32_t minSize = Log::marshallingBufferSize();
+    if (defSize <= minSize)
+        return minSize;
+    // Round up to multiple of minSize
+    return (defSize + minSize) / minSize * minSize;
+}
+
+uint64_t
+MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+    // The message may be too long to fit in one record; if so, write
+    // Message-Chunk records to contain the rest. If it does all fit in one
+    // record, though, optimize the encoding by going straight to the
+    // Message-Start record rather than encoding then copying to the record.
+    MessageStart entry;
+    uint32_t encodedMessageLength = msg->encodedSize();
+    entry.totalLength = encodedMessageLength;
+    CLFS_LSN location, lastChunkLsn;
+    std::auto_ptr<char> encodeStage;
+    char *encodeBuff = 0;
+    bool oneRecord = encodedMessageLength <= MaxMessageContentLength;
+    if (oneRecord) {
+        encodeBuff = entry.content;
+        entry.segmentLength = encodedMessageLength;
+    }
+    else {
+        encodeStage.reset(new char[encodedMessageLength]);
+        encodeBuff = encodeStage.get();
+        entry.segmentLength = MaxMessageContentLength;
+    }
+    qpid::framing::Buffer buff(encodeBuff, encodedMessageLength);
+    msg->encode(buff);
+    if (!oneRecord)
+        memcpy_s(entry.content, sizeof(entry.content),
+                 encodeBuff, entry.segmentLength);
+    uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+    entryLength -= (MaxMessageContentLength - entry.segmentLength);
+    location = write(&entry, entryLength);
+    // Write any Message-Chunk records before setting the message's id.
+    uint32_t sent = entry.segmentLength;
+    uint32_t remaining = encodedMessageLength - entry.segmentLength;
+    while (remaining > 0) {
+        MessageChunk chunk;
+        chunk.segmentLength = std::max(MaxMessageContentLength, remaining);
+        memcpy_s(chunk.content, sizeof(chunk.content),
+                 encodeStage.get() + sent, chunk.segmentLength);
+        entryLength = static_cast<uint32_t>(sizeof(chunk));
+        entryLength -= (MaxMessageContentLength - chunk.segmentLength);
+        lastChunkLsn = write(&chunk, entryLength, &location);
+        sent += chunk.segmentLength;
+        remaining -= chunk.segmentLength;
+    }
+    return lsnToId(location);
+}
+
+void
+MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId)
+{
+    MessageDelete deleteEntry;
+    CLFS_LSN msgLsn = idToLsn(messageId);
+    write(&deleteEntry, sizeof(deleteEntry), &msgLsn);
+    if (newFirstId != 0)
+        moveTail(idToLsn(newFirstId));
+}
+
+void
+MessageLog::recordEnqueue (uint64_t messageId,
+                           uint64_t queueId,
+                           uint64_t transactionId)
+{
+    MessageEnqueue entry(queueId, transactionId);
+    CLFS_LSN msgLsn = idToLsn(messageId);
+    write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recordDequeue (uint64_t messageId,
+                           uint64_t queueId,
+                           uint64_t transactionId)
+{
+    MessageDequeue entry(queueId, transactionId);
+    CLFS_LSN msgLsn = idToLsn(messageId);
+    write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
+                    qpid::store::MessageMap& messageMap,
+                    std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps)
+{
+    // If context and content needs to be saved while reassembling messages
+    // split across log records, save the info and reassembly buffer.
+    struct MessageBlocks {
+        uint32_t totalLength;
+        uint32_t soFarLength;
+        boost::shared_ptr<char> content;
+
+        MessageBlocks() : totalLength(0), soFarLength(0), content((char*)0) {}
+    };
+    std::map<uint64_t, MessageBlocks> reassemblies;
+    std::map<uint64_t, MessageBlocks>::iterator at;
+
+    //   Note that there may be message refs in the log which are deleted, so
+    //   be sure to only add msgs at message-start record, and ignore those
+    //   that don't have an existing message record.
+    // Get the base LSN - that's how to say "start reading at the beginning"
+    CLFS_INFORMATION info;
+    ULONG infoLength = sizeof (info);
+    BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+    QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+    // Pointers for the various record types that can be assigned in the
+    // reading loop below.
+    MessageStart *start;
+    MessageChunk *chunk;
+    MessageEnqueue *enqueue;
+    MessageDequeue *dequeue;
+
+    qpid::store::MessageMap::iterator messageMapSpot;
+    qpid::store::MessageQueueMap::iterator queueMapSpot;
+    PVOID recordPointer;
+    ULONG recordLength;
+    CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+    CLFS_LSN messageLsn, current, undoNext;
+    PVOID readContext;
+    uint64_t msgId;
+    // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+    // via a parameter.
+    current = info.BaseLsn;
+    ok = ::ReadLogRecord(marshal,
+                         &info.BaseLsn,
+                         ClfsContextForward,
+                         &recordPointer,
+                         &recordLength,
+                         &recordType,
+                         &undoNext,
+                         &messageLsn,
+                         &readContext,
+                         0);
+    while (ok) {
+        // All the record types this class writes have a MessageEntryType in the
+        // beginning. Based on that, do what's needed.
+        MessageEntryType *t =
+            reinterpret_cast<MessageEntryType *>(recordPointer);
+        switch(*t) {
+        case MessageStartEntry:
+            start = reinterpret_cast<MessageStart *>(recordPointer);
+            msgId = lsnToId(current);
+            QPID_LOG(debug, "Message Start, id " << msgId);
+            // If the message content is split across multiple log records, save
+            // this content off to the side until the remaining record(s) are
+            // located.
+            if (start->totalLength == start->segmentLength) {  // Whole thing
+                qpid::framing::Buffer buff(start->content, start->totalLength);
+                qpid::broker::RecoverableMessage::shared_ptr m =
+                    recoverer.recoverMessage(buff);
+                m->setPersistenceId(msgId);
+                messageMap[msgId] = m;
+            }
+            else {
+                // Save it in a block big enough.
+                MessageBlocks b;
+                b.totalLength = start->totalLength;
+                b.soFarLength = start->segmentLength;
+                b.content.reset(new char[b.totalLength]);
+                memcpy_s(b.content.get(), b.totalLength,
+                         start->content, start->segmentLength);
+                reassemblies[msgId] = b;
+            }
+            break;
+        case MessageChunkEntry:
+            chunk = reinterpret_cast<MessageChunk *>(recordPointer);
+            // Remember, all entries chained to MessageStart via previous.
+            msgId = lsnToId(messageLsn);
+            QPID_LOG(debug, "Message Chunk for id " << msgId);
+            at = reassemblies.find(msgId);
+            if (at == reassemblies.end()) {
+                QPID_LOG(debug, "Message frag for " << msgId <<
+                                " but no start; discarded");
+            }
+            else {
+                MessageBlocks *b = &(at->second);
+                if (b->soFarLength + chunk->segmentLength > b->totalLength)
+                    throw std::runtime_error("Invalid message chunk length");
+                memcpy_s(b->content.get() + b->soFarLength,
+                         b->totalLength - b->soFarLength,
+                         chunk->content,
+                         chunk->segmentLength);
+                b->soFarLength += chunk->segmentLength;
+                if (b->totalLength == b->soFarLength) {
+                    qpid::framing::Buffer buff(b->content.get(),
+                                               b->totalLength);
+                    qpid::broker::RecoverableMessage::shared_ptr m =
+                        recoverer.recoverMessage(buff);
+                    m->setPersistenceId(msgId);
+                    messageMap[msgId] = m;
+                    reassemblies.erase(at);
+                }
+            }
+            break;
+        case MessageDeleteEntry:
+            msgId = lsnToId(messageLsn);
+            QPID_LOG(debug, "Message Delete, id " << msgId);
+            messageMap.erase(msgId);
+            messageOps.erase(msgId);
+            break;
+        case MessageEnqueueEntry:
+            enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer);
+            msgId = lsnToId(messageLsn);
+            QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " <<
+                            enqueue->queueId);
+            if (messageMap.find(msgId) == messageMap.end()) {
+                QPID_LOG(debug,
+                         "Message " << msgId << " doesn't exist; discarded");
+            }
+            else {
+                std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+                RecoveredMsgOp op(RECOVERED_ENQUEUE,
+                                  enqueue->queueId,
+                                  enqueue->transId);
+                ops.push_back(op);
+            }
+            break;
+        case MessageDequeueEntry:
+            dequeue = reinterpret_cast<MessageDequeue *>(recordPointer);
+            msgId = lsnToId(messageLsn);
+            QPID_LOG(debug, "Message " << msgId << " Dequeue from queue " <<
+                            dequeue->queueId);
+            if (messageMap.find(msgId) == messageMap.end()) {
+                QPID_LOG(debug,
+                         "Message " << msgId << " doesn't exist; discarded");
+            }
+            else {
+                std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+                RecoveredMsgOp op(RECOVERED_DEQUEUE,
+                                  dequeue->queueId,
+                                  dequeue->transId);
+                ops.push_back(op);
+            }
+            break;
+        default:
+            throw std::runtime_error("Bad message log entry type");
+        }
+
+        recordType = ClfsDataRecord;
+        ok = ::ReadNextLogRecord(readContext,
+                                 &recordPointer,
+                                 &recordLength,
+                                 &recordType,
+                                 0,             // No userLsn
+                                 &undoNext,
+                                 &messageLsn,
+                                 &current,
+                                 0);
+    }
+    DWORD status = ::GetLastError();
+    ::TerminateReadLog(readContext);
+    if (status == ERROR_HANDLE_EOF)  // No more records
+        return;
+    throw QPID_WINDOWS_ERROR(status);
+}
+
+}}}  // namespace qpid::store::ms_clfs

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h?rev=1026175&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h Thu Oct 21 23:09:00 2010
@@ -0,0 +1,95 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGELOG_H
+#define QPID_STORE_MSCLFS_MESSAGELOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MessageLog
+ *
+ * Represents a CLFS-housed message log.
+ */
+class MessageLog : public Log {
+
+public:
+    // Inherited and reimplemented from Log. Figure the minimum marshalling
+    // buffer size needed for the records this class writes.
+    virtual uint32_t marshallingBufferSize();
+
+    // Add the specified message to the log; Return the persistence Id.
+    uint64_t add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+    // Write a Delete entry for messageId. If newFirstId is not 0, it is now
+    // the earliest valid message in the log, so move the tail up to it.
+    void deleteMessage(uint64_t messageId, uint64_t newFirstId);
+
+    // Enqueue and dequeue operations track messages' transit across
+    // queues; each operation may be associated with a transaction. If
+    // the transactionId is 0 the operation is not associated with a
+    // transaction.
+    void recordEnqueue (uint64_t messageId,
+                        uint64_t queueId,
+                        uint64_t transactionId);
+    void recordDequeue (uint64_t messageId,
+                        uint64_t queueId,
+                        uint64_t transactionId);
+
+    // Recover the messages and their queueing records from the log.
+    // @param recoverer  Recovery manager used to recreate broker objects from
+    //                   encoded framing buffers recovered from the log.
+    // @param messageMap This method fills in the map of id -> ptr of
+    //                   recovered messages.
+    // @param messageOps This method fills in the map of msg id ->
+    //                   vector of operations involving the message that were
+    //                   recovered from the log. It is the caller's
+    //                   responsibility to sort the operations out and
+    //                   ascertain which operations should be acted on. The
+    //                   order of operations in the vector is as they were
+    //                   read in order from the log.
+    typedef enum { RECOVERED_ENQUEUE = 1, RECOVERED_DEQUEUE } RecoveredOpType;
+    struct RecoveredMsgOp {
+        RecoveredOpType op;
+        uint64_t queueId;
+        uint64_t txnId;
+
+        RecoveredMsgOp(RecoveredOpType o, const uint64_t& q, const uint64_t& t)
+            : op(o), queueId(q), txnId(t) {}
+    };
+    void recover(qpid::broker::RecoveryManager& recoverer,
+                 qpid::store::MessageMap& messageMap,
+                 std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps);
+};
+
+}}}  // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGELOG_H */

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org