You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/09/24 15:49:16 UTC

svn commit: r1389378 [1/2] - in /qpid/branches/asyncstore/cpp/src: ./ qpid/asyncStore/ qpid/broker/ qpid/ha/ qpid/management/ qpid/store/ qpid/xml/ tests/

Author: kpvdr
Date: Mon Sep 24 13:49:13 2012
New Revision: 1389378

URL: http://svn.apache.org/viewvc?rev=1389378&view=rev
Log:
QPID-3858: WIP: Provisional checkin: Wiring of async store interface to broker. Code compiles, but as persistent transactions are currentl disconnected, not all tests pass.

Added:
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h
Modified:
    qpid/branches/asyncstore/cpp/src/CMakeLists.txt
    qpid/branches/asyncstore/cpp/src/asyncstore.cmake
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h
    qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h
    qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp
    qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h
    qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp
    qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp
    qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp
    qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp
    qpid/branches/asyncstore/cpp/src/tests/test_store.cpp

Modified: qpid/branches/asyncstore/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/CMakeLists.txt?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/CMakeLists.txt Mon Sep 24 13:49:13 2012
@@ -1122,7 +1122,12 @@ set (qpidbroker_SOURCES
      qpid/broker/FifoDistributor.cpp
      qpid/broker/MessageGroupManager.cpp
      qpid/broker/PersistableMessage.cpp
+     qpid/broker/AsyncResultHandle.cpp
+     qpid/broker/AsyncResultHandleImpl.cpp
+     qpid/broker/AsyncResultQueueImpl.cpp
      qpid/broker/Bridge.cpp
+     qpid/broker/ConfigAsyncContext.cpp
+     qpid/broker/ConfigHandle.cpp
      qpid/broker/Connection.cpp
      qpid/broker/ConnectionHandler.cpp
      qpid/broker/ConnectionFactory.cpp
@@ -1145,9 +1150,9 @@ set (qpidbroker_SOURCES
      qpid/broker/MessageAdapter.cpp
      qpid/broker/MessageBuilder.cpp
      qpid/broker/MessageHandle.cpp
-     qpid/broker/MessageStoreModule.cpp
+#     qpid/broker/MessageStoreModule.cpp
      qpid/broker/NameGenerator.cpp
-     qpid/broker/NullMessageStore.cpp
+#     qpid/broker/NullMessageStore.cpp
      qpid/broker/QueueBindings.cpp
      qpid/broker/QueuedMessage.cpp
      qpid/broker/QueueCursor.cpp
@@ -1156,6 +1161,8 @@ set (qpidbroker_SOURCES
      qpid/broker/QueueRegistry.cpp
      qpid/broker/QueueSettings.cpp
      qpid/broker/QueueFlowLimit.cpp
+     qpid/broker/RecoveryAsyncContext.cpp
+     qpid/broker/RecoveryHandle.cpp
      qpid/broker/RecoveryManagerImpl.cpp
      qpid/broker/RecoveredEnqueue.cpp
      qpid/broker/RecoveredDequeue.cpp
@@ -1444,7 +1451,7 @@ add_definitions(-DBOOST_FILESYSTEM_VERSI
 # Now create the config file from all the info learned above.
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
                ${CMAKE_CURRENT_BINARY_DIR}/config.h)
-add_subdirectory(qpid/store)
+#add_subdirectory(qpid/store)
 add_subdirectory(tests)
 
 # Support for pkg-config

Modified: qpid/branches/asyncstore/cpp/src/asyncstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/asyncstore.cmake?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/asyncstore.cmake Mon Sep 24 13:49:13 2012
@@ -54,6 +54,7 @@ set (asyncStore_SOURCES
 	qpid/asyncStore/PersistableMessageContext.cpp
 	qpid/asyncStore/Plugin.cpp
 	qpid/asyncStore/QueueHandleImpl.cpp
+	qpid/asyncStore/RecoveryHandleImpl.cpp
 	qpid/asyncStore/RunState.cpp
 	qpid/asyncStore/TxnHandleImpl.cpp
     qpid/broker/AsyncResultHandle.cpp
@@ -65,6 +66,8 @@ set (asyncStore_SOURCES
 	qpid/broker/MessageHandle.cpp
 	qpid/broker/QueueAsyncContext.cpp
 	qpid/broker/QueueHandle.cpp
+    qpid/broker/RecoveryAsyncContext.cpp
+    qpid/broker/RecoveryHandle.cpp
     qpid/broker/SimpleDeliverable.cpp
     qpid/broker/SimpleDeliveryRecord.cpp
 	qpid/broker/SimpleMessage.cpp

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp Mon Sep 24 13:49:13 2012
@@ -25,6 +25,7 @@
 
 #include "qpid/broker/AsyncResultHandle.h"
 #include "qpid/broker/AsyncResultHandleImpl.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
 #include "qpid/broker/QueueAsyncContext.h"
 #include "qpid/broker/TxnAsyncContext.h"
 
@@ -132,6 +133,29 @@ AsyncOpTxnAbort::getOpStr() const {
 }
 
 
+// --- class AsyncOpRecover ---
+
+AsyncOpRecover::AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+                               boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+                               qpid::broker::AsyncStore* store) :
+        AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(rcvrCtxt), store),
+        m_rcvrHandle(rcvrHandle)
+{}
+
+AsyncOpRecover::~AsyncOpRecover() {}
+
+void
+AsyncOpRecover::executeOp() const {
+    // TODO: Implement store operation here
+    submitResult();
+}
+
+const char*
+AsyncOpRecover::getOpStr() const {
+    return "RECOVER";
+}
+
+
 // --- class AsyncOpConfigCreate ---
 
 AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h Mon Sep 24 13:49:13 2012
@@ -90,6 +90,18 @@ private:
 };
 
 
+class AsyncOpRecover: public qpid::asyncStore::AsyncOperation {
+public:
+    AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+                   boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+                   qpid::broker::AsyncStore* store);
+    virtual ~AsyncOpRecover();
+    virtual void executeOp() const;
+    virtual const char* getOpStr() const;
+private:
+    qpid::broker::RecoveryHandle& m_rcvrHandle;
+};
+
 class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
 public:
     AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp Mon Sep 24 13:49:13 2012
@@ -29,6 +29,7 @@
 #include "qpid/asyncStore/EventHandleImpl.h"
 #include "qpid/asyncStore/MessageHandleImpl.h"
 #include "qpid/asyncStore/QueueHandleImpl.h"
+#include "qpid/asyncStore/RecoveryHandleImpl.h"
 #include "qpid/asyncStore/TxnHandleImpl.h"
 #include "qpid/broker/ConfigHandle.h"
 #include "qpid/broker/EnqueueHandle.h"
@@ -36,6 +37,8 @@
 #include "qpid/broker/MessageHandle.h"
 #include "qpid/broker/QueueAsyncContext.h"
 #include "qpid/broker/QueueHandle.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
 #include "qpid/broker/TxnAsyncContext.h"
 #include "qpid/broker/TxnHandle.h"
 
@@ -48,12 +51,17 @@ AsyncStoreImpl::AsyncStoreImpl(boost::sh
         m_opts(opts),
         m_runState(),
         m_operations(m_poller)
-{}
+{
+    QPID_LOG(info, "AsyncStoreImpl::AsyncStoreImpl()");
+}
 
 AsyncStoreImpl::~AsyncStoreImpl() {}
 
 void
-AsyncStoreImpl::initialize() {}
+AsyncStoreImpl::initialize(bool truncateFlag,
+                           bool saveFlag) {
+    QPID_LOG(info, "AsyncStoreImpl::initialize() truncateFlag=" << (truncateFlag?"T":"F") << " saveFlag=" << (saveFlag?"T":"F"));
+}
 
 uint64_t
 AsyncStoreImpl::getNextRid() {
@@ -88,25 +96,42 @@ AsyncStoreImpl::createTxnHandle(const st
 
 void
 AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
-                              boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this));
-    TxnCtxt->setOpStr(op->getOpStr());
+                              boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) {
+    assert(txnCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, txnCtxt, this));
+    txnCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
 void
 AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
-                             boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this));
-    TxnCtxt->setOpStr(op->getOpStr());
+                             boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+    assert(txnCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, txnCtxt, this));
+    txnCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
 void
 AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
-                            boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this));
-    TxnCtxt->setOpStr(op->getOpStr());
+                            boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+    assert(txnCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, txnCtxt, this));
+    txnCtxt->setOpStr(op->getOpStr());
+    m_operations.submit(op);
+}
+
+qpid::broker::RecoveryHandle
+AsyncStoreImpl::createRecoveryHandle() {
+    return qpid::broker::RecoveryHandle(new RecoveryHandleImpl());
+}
+
+void
+AsyncStoreImpl::submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+                              boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt) {
+    assert(rcvrCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpRecover(rcvrHandle, rcvrCtxt, this));
+    rcvrCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
@@ -144,6 +169,7 @@ void
 AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
                              const qpid::broker::DataSource* const dataSrc,
                              boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+    assert(brokerCtxt.get() != 0);
     boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this));
     brokerCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
@@ -152,6 +178,7 @@ AsyncStoreImpl::submitCreate(qpid::broke
 void
 AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
                               boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+    assert(brokerCtxt.get() != 0);
     boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this));
     brokerCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
@@ -160,25 +187,28 @@ AsyncStoreImpl::submitDestroy(qpid::brok
 void
 AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
                              const qpid::broker::DataSource* const dataSrc,
-                             boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this));
-    QueueCtxt->setOpStr(op->getOpStr());
+                             boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+    assert(queueCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, queueCtxt, this));
+    queueCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
 void
 AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
-                              boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this));
-    QueueCtxt->setOpStr(op->getOpStr());
+                              boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+    assert(queueCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, queueCtxt, this));
+    queueCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
 void
 AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
-                            boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this));
-    QueueCtxt->setOpStr(op->getOpStr());
+                            boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+    assert(queueCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, queueCtxt, this));
+    queueCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
@@ -187,6 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broke
                              const qpid::broker::DataSource* const dataSrc,
                              qpid::broker::TxnHandle& txnHandle,
                              boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+    assert(brokerCtxt.get() != 0);
     boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this));
     brokerCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
@@ -196,6 +227,7 @@ void
 AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
                               qpid::broker::TxnHandle& txnHandle,
                               boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+    assert(brokerCtxt.get() != 0);
     boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this));
     brokerCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
@@ -204,28 +236,30 @@ AsyncStoreImpl::submitDestroy(qpid::brok
 void
 AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
                               qpid::broker::TxnHandle& txnHandle,
-                              boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this));
-    QueueCtxt->setOpStr(op->getOpStr());
+                              boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+    assert(queueCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, queueCtxt, this));
+    queueCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
 void
 AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
                               qpid::broker::TxnHandle& txnHandle,
-                              boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
-    boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this));
-    QueueCtxt->setOpStr(op->getOpStr());
+                              boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+    assert(queueCtxt.get() != 0);
+    boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, queueCtxt, this));
+    queueCtxt->setOpStr(op->getOpStr());
     m_operations.submit(op);
 }
 
-int
-AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
-                            qpid::broker::QueueHandle& /*queueHandle*/,
-                            char* /*data*/,
-                            uint64_t /*offset*/,
-                            const uint64_t /*length*/) {
-    return 0;
-}
+//int
+//AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
+//                            qpid::broker::QueueHandle& /*queueHandle*/,
+//                            char* /*data*/,
+//                            uint64_t /*offset*/,
+//                            const uint64_t /*length*/) {
+//    return 0;
+//}
 
 }} // namespace qpid::asyncStore

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h Mon Sep 24 13:49:13 2012
@@ -47,7 +47,7 @@ public:
     AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
                    const AsyncStoreOptions& opts);
     virtual ~AsyncStoreImpl();
-    void initialize();
+    void initialize(bool truncateFlag = false, bool saveFlag = true);
     uint64_t getNextRid(); // Global counter for journal RIDs
 
     // --- Management ---
@@ -65,11 +65,17 @@ public:
                                             qpid::broker::SimpleTxnBuffer* tb);
 
     void submitPrepare(qpid::broker::TxnHandle& txnHandle,
-                       boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt);
+                       boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt);
     void submitCommit(qpid::broker::TxnHandle& txnHandle,
-                      boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+                      boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
     void submitAbort(qpid::broker::TxnHandle& txnHandle,
-                     boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+                     boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
+
+
+    // --- Interface from AsyncRecoverable ---
+    qpid::broker::RecoveryHandle createRecoveryHandle();
+    void submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+                       boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt);
 
 
     // --- Interface from AsyncStore ---
@@ -112,12 +118,12 @@ public:
                        qpid::broker::TxnHandle& txnHandle,
                        boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
 
-    // Legacy - Restore FTD message, is NOT async!
-    virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
-                            qpid::broker::QueueHandle& queueHandle,
-                            char* data,
-                            uint64_t offset,
-                            const uint64_t length);
+//    // Legacy - Restore FTD message, is NOT async!
+//    virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
+//                            qpid::broker::QueueHandle& queueHandle,
+//                            char* data,
+//                            uint64_t offset,
+//                            const uint64_t length);
 
 private:
     boost::shared_ptr<qpid::sys::Poller> m_poller;

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h Mon Sep 24 13:49:13 2012
@@ -29,8 +29,7 @@
 namespace qpid {
 namespace asyncStore {
 
-class ConfigHandleImpl : public virtual qpid::RefCounted
-{
+class ConfigHandleImpl : public virtual qpid::RefCounted {
 public:
     ConfigHandleImpl();
     virtual ~ConfigHandleImpl();

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp Mon Sep 24 13:49:13 2012
@@ -50,6 +50,8 @@ OperationQueue::OpQueue::Batch::const_it
 OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) {
     try {
         for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+// DEBUG: kpvdr
+std::cout << "#### OperationQueue::handle(): op=" << (*i)->getOpStr() << std::endl << std::flush;
             (*i)->executeOp(); // Do store work here
         }
     } catch (const std::exception& e) {

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp Mon Sep 24 13:49:13 2012
@@ -43,7 +43,7 @@ Plugin::earlyInitialize(Target& target) 
     }
     m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
     boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
-    broker->setAsyncStore(brokerAsyncStore);
+    broker->setStore(brokerAsyncStore);
     boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
     target.addFinalizer(fn);
     QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir);

Added: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp Mon Sep 24 13:49:13 2012
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryHandleImpl.cpp
+ */
+
+#include "RecoveryHandleImpl.h"
+
+namespace qpid {
+namespace asyncStore {
+
+RecoveryHandleImpl::RecoveryHandleImpl() {}
+
+RecoveryHandleImpl::~RecoveryHandleImpl() {}
+
+}} // namespace qpid::asyncStore

Added: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h Mon Sep 24 13:49:13 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoverHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_RecoveryHandleImpl_h_
+#define qpid_asyncStore_RecoveryHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class RecoveryHandleImpl: public virtual qpid::RefCounted {
+public:
+    RecoveryHandleImpl();
+    virtual ~RecoveryHandleImpl();
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_RecoveryHandleImpl_h_

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h Mon Sep 24 13:49:13 2012
@@ -20,6 +20,7 @@
 #ifndef qpid_broker_AsyncStore_h_
 #define qpid_broker_AsyncStore_h_
 
+#include "qpid/broker/RecoveryManager.h"
 #include "qpid/types/Variant.h" // qpid::types::Variant::Map
 
 #include <boost/shared_ptr.hpp>
@@ -65,9 +66,12 @@ class EnqueueHandle;
 class EventHandle;
 class MessageHandle;
 class QueueHandle;
+class RecoveryHandle;
 class TxnHandle;
 
+class InitAsyncContext;
 class QueueAsyncContext;
+class RecoveryAsyncContext;
 class TpcTxnAsyncContext;
 class TxnAsyncContext;
 class SimpleTxnBuffer;
@@ -92,10 +96,20 @@ public:
                              boost::shared_ptr<TxnAsyncContext>) = 0;
 };
 
+class AsyncRecoverable {
+public:
+    virtual ~AsyncRecoverable() {}
+    virtual RecoveryHandle createRecoveryHandle() = 0;
+    virtual void submitRecover(qpid::broker::RecoveryHandle&,
+                               boost::shared_ptr<qpid::broker::RecoveryAsyncContext>) = 0;
+};
+
 // Subclassed by store:
-class AsyncStore : public AsyncTransactionalStore {
+class AsyncStore : public AsyncTransactionalStore,
+                   public AsyncRecoverable {
 public:
     virtual ~AsyncStore() {}
+    virtual void initialize(bool truncateFlag = false, bool saveFlag = true) = 0;
 
     // --- Factory methods for creating handles ---
 
@@ -144,12 +158,12 @@ public:
                                TxnHandle&,
                                boost::shared_ptr<QueueAsyncContext>) = 0;
 
-    // Legacy - Restore FTD message, is NOT async!
-    virtual int loadContent(MessageHandle&,
-                            QueueHandle&,
-                            char* data,
-                            uint64_t offset,
-                            const uint64_t length) = 0;
+//    // Legacy - Restore FTD message, is NOT async!
+//    virtual int loadContent(MessageHandle&,
+//                            QueueHandle&,
+//                            char* data,
+//                            uint64_t offset,
+//                            const uint64_t length) = 0;
 };
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Mon Sep 24 13:49:13 2012
@@ -20,12 +20,16 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/ConfigAsyncContext.h"
+#include "qpid/broker/ConfigHandle.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/MessageStoreModule.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/MessageStoreModule.h"
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/RecoveryAsyncContext.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/broker/SaslAuthenticator.h"
 #include "qpid/broker/SecureConnectionFactory.h"
@@ -34,6 +38,8 @@
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
 #include "qpid/broker/MessageGroupManager.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
@@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& co
     managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
                                                           conf.qmf2Support)
                                     : 0),
-    store(new NullMessageStore),
+//    store(new NullMessageStore),
+//    asyncStore(0),
+    asyncResultQueue(poller),
     acl(0),
     dataDir(conf.noDataDir ? std::string() : conf.dataDir),
     queues(this),
@@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& co
     MessageGroupManager::setDefaults(conf.defaultMsgGroup);
 
     // If no plugin store module registered itself, set up the null store.
-    if (NullMessageStore::isNullStore(store.get()))
-        setStore();
+//    if (NullMessageStore::isNullStore(store.get()))
+//        setStore();
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
 
-    if (store.get() != 0) {
+//    if (store.get() != 0) {
+    if (asyncStore.get() != 0) {
         // The cluster plug-in will setRecovery(false) on all but the first
         // broker to join a cluster.
         if (getRecovery()) {
+            QPID_LOG(info, "Store recovery starting")
             RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
-            store->recover(recoverer);
+            RecoveryHandle rh = asyncStore->createRecoveryHandle();
+            boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+            asyncStore->submitRecover(rh, rac);
+//            store->recover(recoverer);
         }
         else {
             QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
-            store->truncateInit(true); // save old files in subdir
+//            store->truncateInit(true); // save old files in subdir
+            asyncStore->initialize(true, true);
         }
     }
+// debug
+    else QPID_LOG(info, ">>>> No store!!!!")
 
     //ensure standard exchanges exist (done after recovery from store)
     declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& co
 
 void Broker::declareStandardExchange(const std::string& name, const std::string& type)
 {
-    bool storeEnabled = store.get() != NULL;
+//    bool storeEnabled = store.get() != NULL;
+    bool storeEnabled = asyncStore.get() != NULL;
     std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
     if (status.second && storeEnabled) {
-        store->create(*status.first, framing::FieldTable ());
+//        store->create(*status.first, framing::FieldTable ());
+        ConfigHandle ch = asyncStore->createConfigHandle();
+        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+        asyncStore->submitCreate(ch, status.first.get(), bc);
     }
 }
 
@@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::cre
     return boost::intrusive_ptr<Broker>(new Broker(opts));
 }
 
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
-{
-    store.reset(new MessageStoreModule (_store));
-    setStore();
-}
+//void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//{
+//    store.reset(new MessageStoreModule (_store));
+//    setStore();
+//}
 
-void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/)
+void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore)
 {
-    // TODO: Provide implementation for async store interface
+//    asyncStore.reset(_asyncStore.get());
+    asyncStore = _asyncStore;
+    setStore();
 }
 
 void Broker::setStore () {
-    queues.setStore     (store.get());
-    dtxManager.setStore (store.get());
-    links.setStore      (store.get());
+//    queues.setStore     (store.get());
+    queues.setStore(asyncStore.get());
+//    dtxManager.setStore (store.get());
+    dtxManager.setStore(asyncStore.get());
+//    links.setStore      (store.get());
+    links.setStore(asyncStore.get());
+}
+
+// static
+void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+    std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+// static
+void Broker::configureComplete(const AsyncResultHandle* const arh) {
+    std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
 }
 
+
 void Broker::run() {
     if (config.workerThreads > 0) {
         QPID_LOG(notice, "Broker running");
@@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue(
         return Manageable::STATUS_UNKNOWN_OBJECT;
     }
     q->query( results );
-    return Manageable::STATUS_OK;;
+    return Manageable::STATUS_OK;
 }
 
 Manageable::status_t Broker::getTimestampConfig(bool& receive,
@@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Br
             alternate->incAlternateUsers();
         }
         if (durable) {
-            store->create(*result.first, arguments);
+//            store->create(*result.first, arguments);
+            ConfigHandle ch = asyncStore->createConfigHandle();
+            boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+            asyncStore->submitCreate(ch, result.first.get(), bc);
         }
         if (managementAgent.get()) {
             //TODO: debatable whether we should raise an event here for
@@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::s
     Exchange::shared_ptr exchange(exchanges.get(name));
     if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
     if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
-    if (exchange->isDurable()) store->destroy(*exchange);
+//    if (exchange->isDurable()) store->destroy(*exchange);
+    if (exchange->isDurable()) {
+//        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+//        asyncStore->submitDestroy(exchange.getHandle(), bc);
+    }
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
     exchanges.destroy(name);
 
@@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& q
     } else {
         if (exchange->unbind(queue, key, 0)) {
             if (exchange->isDurable() && queue->isDurable()) {
-                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+//                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+                // TODO: kpvdr: Async config destroy here
             }
             getConfigurationObservers().unbind(
                 exchange, queue, key, framing::FieldTable());

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Mon Sep 24 13:49:13 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "qpid/broker/AsyncResultQueueImpl.h"
 #include "qpid/broker/AsyncStore.h"
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/ConnectionFactory.h"
@@ -29,7 +30,7 @@
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/SessionManager.h"
@@ -138,6 +139,8 @@ class Broker : public sys::Runnable, pub
 
     void declareStandardExchange(const std::string& name, const std::string& type);
     void setStore ();
+    static void recoverComplete(const AsyncResultHandle* const);
+    static void configureComplete(const AsyncResultHandle* const);
     void setLogLevel(const std::string& level);
     std::string getLogLevel();
     void createObject(const std::string& type, const std::string& name,
@@ -160,7 +163,10 @@ class Broker : public sys::Runnable, pub
     Options config;
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
-    std::auto_ptr<MessageStore> store;
+//    std::auto_ptr<MessageStore> store;
+//    std::auto_ptr<AsyncStore> asyncStore;
+    boost::shared_ptr<AsyncStore> asyncStore;
+    AsyncResultQueueImpl asyncResultQueue;
     AclModule* acl;
     DataDir dataDir;
     ConnectionObservers connectionObservers;
@@ -213,9 +219,10 @@ class Broker : public sys::Runnable, pub
     /** Shut down the broker */
     QPID_BROKER_EXTERN virtual void shutdown();
 
-    QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
-    void setAsyncStore(boost::shared_ptr<AsyncStore>& asyncStore);
-    MessageStore& getStore() { return *store; }
+//    QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+    void setStore(boost::shared_ptr<AsyncStore>& asyncStore);
+//    MessageStore& getStore() { return *store; }
+    AsyncStore& getStore() { return *asyncStore; }
     void setAcl (AclModule* _acl) {acl = _acl;}
     AclModule* getAcl() { return acl; }
     QueueRegistry& getQueues() { return queues; }

Added: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp Mon Sep 24 13:49:13 2012
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.cpp
+ */
+
+#include "ConfigAsyncContext.h"
+
+namespace qpid {
+namespace broker {
+
+ConfigAsyncContext::ConfigAsyncContext(AsyncResultCallback rcb,
+                                       AsyncResultQueue* const arq) :
+        m_rcb(rcb),
+        m_arq(arq)
+{}
+
+ConfigAsyncContext::~ConfigAsyncContext() {}
+
+AsyncResultQueue*
+ConfigAsyncContext::getAsyncResultQueue() const {
+    return m_arq;
+}
+
+void
+ConfigAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
+    if (m_rcb) {
+        m_rcb(arh);
+    }
+}
+
+}} // namespace qpid

Added: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h Mon Sep 24 13:49:13 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.h
+ */
+
+#ifndef qpid_broker_ConfigAsyncContext_h_
+#define qpid_broker_ConfigAsyncContext_h_
+
+#include "AsyncStore.h"
+
+namespace qpid {
+namespace broker {
+class AsyncResultHandle;
+class AsyncResultQueue;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
+class ConfigAsyncContext: public qpid::broker::BrokerAsyncContext
+{
+public:
+    ConfigAsyncContext(AsyncResultCallback rcb,
+                       AsyncResultQueue* const arq);
+    virtual ~ConfigAsyncContext();
+    virtual AsyncResultQueue* getAsyncResultQueue() const;
+    virtual void invokeCallback(const AsyncResultHandle* const) const;
+
+private:
+    AsyncResultCallback m_rcb;
+    AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_ConfigAsyncContext_h_

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Mon Sep 24 13:49:13 2012
@@ -199,3 +199,10 @@ bool DirectExchange::isBound(Queue::shar
 DirectExchange::~DirectExchange() {}
 
 const std::string DirectExchange::typeName("direct");
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t DirectExchange::getSize() {
+    return 0;
+}
+void DirectExchange::write(char* /*target*/) {}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h Mon Sep 24 13:49:13 2012
@@ -65,6 +65,11 @@ public:
     QPID_BROKER_EXTERN virtual ~DirectExchange();
 
     virtual bool supportsDynamicBinding() { return true; }
+
+    // DataSource interface - used to write persistence data to async store
+    uint64_t getSize();
+    void write(char* target);
+
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp Mon Sep 24 13:49:13 2012
@@ -35,7 +35,8 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+//DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {}
 
 DtxManager::~DtxManager() {}
 
@@ -124,7 +125,8 @@ DtxWorkRecord* DtxManager::createWork(co
         throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)"));
     } else {
         std::string ncxid = xid; // Work around const correctness problems in ptr_map.
-        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+//        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, asyncTxnStore)).first);
     }
 }
 
@@ -172,9 +174,11 @@ void DtxManager::DtxCleanup::fire()
     }
 }
 
-void DtxManager::setStore (TransactionalStore* _store)
+//void DtxManager::setStore (TransactionalStore* _store)
+void DtxManager::setStore (AsyncTransactionalStore* _ats)
 {
-    store = _store;
+//    store = _store;
+    asyncTxnStore = _ats;
 }
 
 std::string DtxManager::convert(const qpid::framing::Xid& xid)

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h Mon Sep 24 13:49:13 2012
@@ -22,9 +22,10 @@
 #define _DtxManager_
 
 #include <boost/ptr_container/ptr_map.hpp>
+#include "qpid/broker/AsyncStore.h"
 #include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/TransactionalStore.h"
+//#include "qpid/broker/TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/Xid.h"
 #include "qpid/sys/Mutex.h"
@@ -46,7 +47,8 @@ class DtxManager{
     };
 
     WorkMap work;
-    TransactionalStore* store;
+//    TransactionalStore* store;
+    AsyncTransactionalStore* asyncTxnStore;
     qpid::sys::Mutex lock;
     qpid::sys::Timer* timer;
 
@@ -65,7 +67,8 @@ public:
     void setTimeout(const std::string& xid, uint32_t secs);
     uint32_t getTimeout(const std::string& xid);
     void timedout(const std::string& xid);
-    void setStore(TransactionalStore* store);
+//    void setStore(TransactionalStore* store);
+    void setStore(AsyncTransactionalStore* ats);
     void setTimer(sys::Timer& t) { timer = &t; }
 
     // Used by cluster for replication.

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp Mon Sep 24 13:49:13 2012
@@ -29,8 +29,8 @@ using qpid::sys::Mutex;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
-    xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, AsyncTransactionalStore* const _ats) :
+    xid(_xid), asyncTxnStore(_ats), completed(false), rolledback(false), prepared(false), expired(false) {}
 
 DtxWorkRecord::~DtxWorkRecord()
 {
@@ -43,14 +43,15 @@ bool DtxWorkRecord::prepare()
 {
     Mutex::ScopedLock locker(lock);
     if (check()) {
-        txn = store->begin(xid);
-        if (prepare(txn.get())) {
-            store->prepare(*txn);
-            prepared = true;
-        } else {
-            abort();
-            //TODO: this should probably be flagged as internal error
-        }
+//        txn = asyncTxnStore->begin(xid);
+//        if (prepare(txn.get())) {
+//            asyncTxnStore->prepare(*txn);
+//            prepared = true;
+//        } else {
+//            abort();
+//            //TODO: this should probably be flagged as internal error
+//        }
+        // TODO: kpvdr: Async transaction prepare here
     } else {
         //some part of the work has been marked rollback only
         abort();
@@ -67,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
     return succeeded;
 }
 
-bool DtxWorkRecord::commit(bool onePhase)
+bool DtxWorkRecord::commit(bool onePhase) // why is onePhase necessary if prepared already contains the necessary state?
 {
     Mutex::ScopedLock locker(lock);
     if (check()) {
@@ -77,7 +78,8 @@ bool DtxWorkRecord::commit(bool onePhase
                 throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!"));
             }
 
-            store->commit(*txn);
+//            asyncTxnStore->commit(*txn);
+            // TODO: kpvdr: Async transaction commit here
             txn.reset();
 
             std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
@@ -87,17 +89,20 @@ bool DtxWorkRecord::commit(bool onePhase
             if (!onePhase) {
                 throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!"));
             }
-            std::auto_ptr<TransactionContext> localtxn = store->begin();
-            if (prepare(localtxn.get())) {
-                store->commit(*localtxn);
-                std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
-                return true;
-            } else {
-                store->abort(*localtxn);
-                abort();
-                //TODO: this should probably be flagged as internal error
-                return false;
-            }
+//            std::auto_ptr<TransactionContext> localtxn = asyncTxnStore->begin();
+//            if (prepare(localtxn.get())) {
+//                asyncTxnStore->commit(*localtxn);
+//                std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+//                return true;
+//            } else {
+//                asyncTxnStore->abort(*localtxn);
+//                abort();
+//                //TODO: this should probably be flagged as internal error
+//                return false;
+//            }
+            // TODO: kpvdr: Local transaction async prepare and commit here
+            // temp return value:
+            return true;
         }
     } else {
         //some part of the work has been marked rollback only
@@ -147,7 +152,8 @@ bool DtxWorkRecord::check()
 void DtxWorkRecord::abort()
 {
     if (txn.get()) {
-        store->abort(*txn);
+//        asyncTxnStore->abort(*txn);
+        // TODO: kpvdr: Async transaction abore here
         txn.reset();
     }
     std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h Mon Sep 24 13:49:13 2012
@@ -21,6 +21,7 @@
 #ifndef _DtxWorkRecord_
 #define _DtxWorkRecord_
 
+#include "qpid/broker/AsyncStore.h"
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/DtxTimeout.h"
@@ -48,7 +49,8 @@ class DtxWorkRecord
     typedef std::vector<DtxBuffer::shared_ptr> Work;
 
     const std::string xid;
-    TransactionalStore* const store;
+//    TransactionalStore* const store;
+    AsyncTransactionalStore* const asyncTxnStore;
     bool completed;
     bool rolledback;
     bool prepared;
@@ -63,7 +65,8 @@ class DtxWorkRecord
     bool prepare(TransactionContext* txn);
 public:
     QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid,
-                                     TransactionalStore* const store);
+//                                     TransactionalStore* const store);
+                                       AsyncTransactionalStore* const store);
     QPID_BROKER_EXTERN ~DtxWorkRecord();
     QPID_BROKER_EXTERN bool prepare();
     QPID_BROKER_EXTERN bool commit(bool onePhase);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Mon Sep 24 13:49:13 2012
@@ -23,10 +23,11 @@
  */
 
 #include <boost/shared_ptr.hpp>
+#include <qpid/broker/AsyncStore.h>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
 #include "qpid/broker/PersistableExchange.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Mutex.h"
@@ -35,13 +36,15 @@
 #include "qmf/org/apache/qpid/broker/Binding.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
 
+#include <set>
+
 namespace qpid {
 namespace broker {
 
 class Broker;
 class ExchangeRegistry;
 
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable {
 public:
     struct Binding : public management::Manageable {
         typedef boost::shared_ptr<Binding>       shared_ptr;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h Mon Sep 24 13:49:13 2012
@@ -24,7 +24,7 @@
 
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Exchange.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Mon Sep 24 13:49:13 2012
@@ -120,3 +120,11 @@ bool FanOutExchange::isBound(Queue::shar
 FanOutExchange::~FanOutExchange() {}
 
 const std::string FanOutExchange::typeName("fanout");
+
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t FanOutExchange::getSize() {
+    return 0;
+}
+void FanOutExchange::write(char* /*target*/) {}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h Mon Sep 24 13:49:13 2012
@@ -62,6 +62,11 @@ class FanOutExchange : public virtual Ex
 
     QPID_BROKER_EXTERN virtual ~FanOutExchange();
     virtual bool supportsDynamicBinding() { return true; }
+
+    // DataSource interface - used to write persistence data to async store
+    uint64_t getSize();
+    void write(char* target);
+
 };
 
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp Mon Sep 24 13:49:13 2012
@@ -418,3 +418,9 @@ bool HeadersExchange::FedUnbindModifier:
     return true;
 }
 
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t HeadersExchange::getSize() {
+    return 0;
+}
+void HeadersExchange::write(char* /*target*/) {}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h Mon Sep 24 13:49:13 2012
@@ -107,6 +107,11 @@ class HeadersExchange : public virtual E
 
     static QPID_BROKER_EXTERN bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
     static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+
+    // DataSource interface - used to write persistence data to async store
+    uint64_t getSize();
+    void write(char* target);
+
 };
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp Mon Sep 24 13:49:13 2012
@@ -115,6 +115,11 @@ public:
         link = _link;
     }
 
+    // DataSource interface - used to write persistence data to async store
+    uint64_t getSize() { return 0; }
+    void write(char* /*target*/) {}
+
+
 private:
     Link *link;
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp Mon Sep 24 13:49:13 2012
@@ -42,7 +42,8 @@ namespace _qmf = qmf::org::apache::qpid:
 // factored: The persistence element should be factored separately
 LinkRegistry::LinkRegistry () :
     broker(0),
-    parent(0), store(0), passive(false),
+//    parent(0), store(0), passive(false),
+    parent(0), asyncStore(0), passive(false),
     realm("")
 {
 }
@@ -59,7 +60,7 @@ class LinkRegistryConnectionObserver : p
 
 LinkRegistry::LinkRegistry (Broker* _broker) :
     broker(_broker),
-    parent(0), store(0), passive(false),
+    parent(0), asyncStore(0), passive(false),
     realm(broker->getOptions().realm)
 {
     broker->getConnectionObservers().add(
@@ -117,7 +118,11 @@ pair<Link::shared_ptr, bool> LinkRegistr
                       boost::bind(&LinkRegistry::linkDestroyed, this, _1),
                       durable, authMechanism, username, password, broker,
                       parent, failover));
-        if (durable && store) store->create(*link);
+//        if (durable && store) store->create(*link);
+        if (durable && asyncStore) {
+//            store->create(*link);
+             // TODO: kpvdr: async create config (link)
+        }
         links[name] = link;
         pendingLinks[name] = link;
         QPID_LOG(debug, "Creating new link; name=" << name );
@@ -213,8 +218,11 @@ pair<Bridge::shared_ptr, bool> LinkRegis
                        args, init, queueName, altExchange));
         bridges[name] = bridge;
         link.add(bridge);
-        if (durable && store)
-            store->create(*bridge);
+//        if (durable && store)
+        if (durable && asyncStore) {
+//            store->create(*bridge);
+            // TODO: kpvdr: Async create config (bridge)
+        }
 
         QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
                  "' from " << src << " to " << dest << " (" << key << ")");
@@ -234,8 +242,11 @@ void LinkRegistry::linkDestroyed(Link *l
     LinkMap::iterator i = links.find(link->getName());
     if (i != links.end())
     {
-        if (i->second->isDurable() && store)
-            store->destroy(*(i->second));
+//        if (i->second->isDurable() && store)
+        if (i->second->isDurable() && asyncStore) {
+//            store->destroy(*(i->second));
+            // TODO: kpvdr: Async destroy config (link)
+        }
         links.erase(i);
     }
 }
@@ -254,18 +265,22 @@ void LinkRegistry::destroyBridge(Bridge 
     if (link) {
         link->cancel(b->second);
     }
-    if (b->second->isDurable())
-        store->destroy(*(b->second));
+//    if (b->second->isDurable())
+    if (b->second->isDurable()) {
+//        store->destroy(*(b->second));
+        // TODO: kpvdr: Async destroy config (bridge)
+    }
     bridges.erase(b);
 }
 
-void LinkRegistry::setStore (MessageStore* _store)
-{
-    store = _store;
+//void LinkRegistry::setStore (MessageStore* _store)
+void LinkRegistry::setStore (AsyncStore* _asyncStore) {
+    asyncStore = _asyncStore;
 }
 
-MessageStore* LinkRegistry::getStore() const {
-    return store;
+//MessageStore* LinkRegistry::getStore() const {
+AsyncStore* LinkRegistry::getStore() const {
+    return asyncStore;
 }
 
 namespace {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h Mon Sep 24 13:49:13 2012
@@ -25,7 +25,7 @@
 #include <map>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Bridge.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
 #include "qpid/Address.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
@@ -52,7 +52,8 @@ namespace broker {
         qpid::sys::Mutex lock;
         Broker* broker;
         management::Manageable* parent;
-        MessageStore* store;
+//        MessageStore* store;
+        AsyncStore* asyncStore;
         bool passive;
         std::string realm;
 
@@ -130,12 +131,14 @@ namespace broker {
         /**
          * Set the store to use.  May only be called once.
          */
-        QPID_BROKER_EXTERN void setStore (MessageStore*);
+//        QPID_BROKER_EXTERN void setStore (MessageStore*);
+        QPID_BROKER_EXTERN void setStore (AsyncStore*);
 
         /**
          * Return the message store used.
          */
-        QPID_BROKER_EXTERN MessageStore* getStore() const;
+//        QPID_BROKER_EXTERN MessageStore* getStore() const;
+        QPID_BROKER_EXTERN AsyncStore* getStore() const;
 
         QPID_BROKER_EXTERN std::string getAuthMechanism   (const std::string& key);
         QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp Mon Sep 24 13:49:13 2012
@@ -33,8 +33,10 @@ bool isLowerPriorityThan(uint8_t priorit
 }
 }
 
-LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
-    : Queue(n, s, ms, p, b) {}
+//LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+//    : Queue(n, s, ms, p, b) {}
+LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+    : Queue(n, s, as, p, b) {}
 
 bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h Mon Sep 24 13:49:13 2012
@@ -32,7 +32,8 @@ namespace broker {
 class LossyQueue : public Queue
 {
   public:
-    LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+//    LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+    LossyQueue(const std::string&, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
     bool checkDepth(const QueueDepth& increment, const Message&);
   private:
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp Mon Sep 24 13:49:13 2012
@@ -25,8 +25,10 @@
 
 namespace qpid {
 namespace broker {
-Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
-    : Queue(n, s, ms, p, b), messageMap(*m)
+//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+//    : Queue(n, s, ms, p, b), messageMap(*m)
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+    : Queue(n, s, as, p, b), messageMap(*m)
 {
     messages = m;
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h Mon Sep 24 13:49:13 2012
@@ -35,7 +35,8 @@ class MessageMap;
 class Lvq : public Queue
 {
   public:
-    Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+//    Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+    Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
     void push(Message& msg, bool isRecovery=false);
   private:
     MessageMap& messageMap;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
  * under the License.
  *
  */
+
+#error "deprecated in favor of AsyncStore"
+
 #ifndef _MessageStore_
 #define _MessageStore_
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp Mon Sep 24 13:49:13 2012
@@ -19,6 +19,8 @@
  *
  */
 
+#error "deprecated in favor of AsyncStore"
+
 #include "qpid/broker/MessageStoreModule.h"
 #include "qpid/broker/NullMessageStore.h"
 #include <iostream>

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
  * under the License.
  *
  */
+
+#error "deprecated in favor of AsyncStore"
+
 #ifndef _MessageStoreModule_
 #define _MessageStoreModule_
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp Mon Sep 24 13:49:13 2012
@@ -19,6 +19,8 @@
  *
  */
 
+#error "deprecated in favor of AsyncStore"
+
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/MessageStoreModule.h"
 #include "qpid/broker/RecoveryManager.h"

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
  * under the License.
  *
  */
+
+#error "deprected in favor of AsyncStore"
+
 #ifndef _NullMessageStore_
 #define _NullMessageStore_
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp Mon Sep 24 13:49:13 2012
@@ -64,19 +64,19 @@ void PersistableMessage::flush()
     //TODO: is this really the right place for this?
 }
 
-// deprecated
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
-{
-    enqueueStart();
-}
+//// deprecated
+//void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+//{
+//    enqueueStart();
+//}
 
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*)
 {
     enqueueStart();
 }
 
-// deprecated
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
+//// deprecated
+//void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
 
 void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {}
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h Mon Sep 24 13:49:13 2012
@@ -80,16 +80,16 @@ class PersistableMessage : public Persis
     QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
     QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
 
-    QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
-                                         MessageStore* _store);
+//    QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
+//                                         MessageStore* _store);
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          AsyncStore* _store);
 
 
     QPID_BROKER_EXTERN bool isDequeueComplete();
     QPID_BROKER_EXTERN void dequeueComplete();
-    QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
-                                         MessageStore* _store);
+//    QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
+//                                         MessageStore* _store);
     QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
                                          AsyncStore* _store);
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp Mon Sep 24 13:49:13 2012
@@ -26,11 +26,11 @@
 #include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
 #include "qpid/broker/MessageDeque.h"
 #include "qpid/broker/MessageDistributor.h"
 #include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
 
 //TODO: get rid of this
@@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw(
 }
 
 Queue::Queue(const string& _name, const QueueSettings& _settings,
-             MessageStore* const _store,
+//             MessageStore* const _store,
+             AsyncStore* const _asyncStore,
              Manageable* parent,
              Broker* b) :
 
     name(_name),
-    store(_store),
+//    store(_store),
+    asyncStore(_asyncStore),
     owner(0),
     consumerCount(0),
     browserCount(0),
@@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const 
         ManagementAgent* agent = broker->getManagementAgent();
 
         if (agent != 0) {
-            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+//            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete);
             mgmtObject->set_arguments(settings.asMap());
-            agent->addObject(mgmtObject, 0, store != 0);
+//            agent->addObject(mgmtObject, 0, store != 0);
+            agent->addObject(mgmtObject, 0, asyncStore != 0);
             brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
             if (brokerMgmtObject)
                 brokerMgmtObject->inc_queueCount();
@@ -787,7 +791,7 @@ void Queue::setLastNodeFailure()
  * return true if enqueue succeeded and message should be made
  * available; returning false will result in the message being dropped
  */
-bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
+bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
@@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext* 
         msg.addTraceId(settings.traceId);
     }
 
-    if (msg.isPersistent() && store) {
+//    if (msg.isPersistent() && store) {
+    if (msg.isPersistent() && asyncStore) {
         // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
         // when it considers the message stored.
         boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
         assert(pmsg);
-        pmsg->enqueueAsync(shared_from_this(), store);
-        store->enqueue(ctxt, pmsg, *this);
+//        pmsg->enqueueAsync(shared_from_this(), store);
+        pmsg->enqueueAsync(shared_from_this(), asyncStore);
+//        store->enqueue(ctxt, pmsg, *this);
+        // TODO - kpvdr: async enqueue here
     }
     return true;
 }
@@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Messag
 void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
 {
     ScopedUse u(barrier);
-    if (u.acquired && msg && store) {
-        store->dequeue(0, msg, *this);
+//    if (u.acquired && msg && store) {
+    if (u.acquired && msg && asyncStore) {
+//        store->dequeue(0, msg, *this);
+        // TODO: kpvdr: async dequeue here
     }
 }
 
@@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext* 
             return;
         }
     }
-    if (store && pmsg) {
-        store->dequeue(ctxt, pmsg, *this);
+//    if (store && pmsg) {
+    if (asyncStore && pmsg) {
+//        store->dequeue(ctxt, pmsg, *this);
+        // TODO: kpvdr: async dequeue here
     }
 }
 
@@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const
 
 void Queue::create()
 {
-    if (store) {
-        store->create(*this, settings.storeSettings);
+//    if (store) {
+    if (asyncStore) {
+//        store->create(*this, settings.storeSettings);
+        // TODO: kpvdr: async store create here
     }
 }
 
@@ -1051,11 +1064,16 @@ void Queue::destroyed()
         alternateExchange->decAlternateUsers();
     }
 
-    if (store) {
+//    if (store) {
+    if (asyncStore) {
         barrier.destroy();
-        store->flush(*this);
-        store->destroy(*this);
-        store = 0;//ensure we make no more calls to the store for this queue
+//        store->flush(*this);
+        // TODO: kpvdr: async flush here
+//        store->destroy(*this);
+        // TODO: kpvdr: async destroy here
+//        store = 0;//ensure we make no more calls to the store for this queue
+        // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
+        // will cause store to be destroyed when all outstanding async ops are complete.
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
     notifyDeleted();
@@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared
 void Queue::flush()
 {
     ScopedUse u(barrier);
-    if (u.acquired && store) store->flush(*this);
+//    if (u.acquired && store) store->flush(*this);
+    // TODO: kpvdr: Async store flush here
+    if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
 }
 
 
@@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Excha
     if (exchange->bind(shared_from_this(), key, &arguments)) {
         bound(exchange->getName(), key, arguments);
         if (exchange->isDurable() && isDurable()) {
-            store->bind(*exchange, *this, key, arguments);
+//            store->bind(*exchange, *this, key, arguments);
+            // TODO: kpvdr: Store configuration here
         }
         return true;
     } else {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h Mon Sep 24 13:49:13 2012
@@ -59,7 +59,7 @@ namespace qpid {
 namespace broker {
 class Broker;
 class Exchange;
-class MessageStore;
+//class MessageStore;
 class QueueDepth;
 class QueueEvents;
 class QueueRegistry;
@@ -115,7 +115,8 @@ class Queue : public boost::enable_share
     typedef boost::function1<void, Message&> MessageFunctor;
 
     const std::string name;
-    MessageStore* store;
+//    MessageStore* store;
+    AsyncStore* asyncStore;
     const OwnershipToken* owner;
     uint32_t consumerCount;     // Actually a count of all subscriptions, acquiring or not.
     uint32_t browserCount;      // Count of non-acquiring subscriptions.
@@ -201,7 +202,8 @@ class Queue : public boost::enable_share
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
                              const QueueSettings& settings = QueueSettings(),
-                             MessageStore* const store = 0,
+//                             MessageStore* const store = 0,
+                             AsyncStore* const asyncStore = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
     QPID_BROKER_EXTERN virtual ~Queue();
@@ -286,7 +288,8 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
     QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
     QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
-    inline bool isDurable() const { return store != 0; }
+//    inline bool isDurable() const { return store != 0; }
+    inline bool isDurable() const { return asyncStore != 0; }
     inline const QueueSettings& getSettings() const { return settings; }
     inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
     inline bool isAutoDelete() const { return settings.autodelete; }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org