You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/09/24 15:49:16 UTC
svn commit: r1389378 [1/2] - in /qpid/branches/asyncstore/cpp/src: ./
qpid/asyncStore/ qpid/broker/ qpid/ha/ qpid/management/ qpid/store/
qpid/xml/ tests/
Author: kpvdr
Date: Mon Sep 24 13:49:13 2012
New Revision: 1389378
URL: http://svn.apache.org/viewvc?rev=1389378&view=rev
Log:
QPID-3858: WIP: Provisional checkin: Wiring of async store interface to broker. Code compiles, but as persistent transactions are currentl disconnected, not all tests pass.
Added:
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h
Modified:
qpid/branches/asyncstore/cpp/src/CMakeLists.txt
qpid/branches/asyncstore/cpp/src/asyncstore.cmake
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp
qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h
qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h
qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h
qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h
qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h
qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h
qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h
qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h
qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h
qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h
qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h
qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h
qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h
qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h
qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h
qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h
qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp
qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h
qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp
qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp
qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp
qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp
qpid/branches/asyncstore/cpp/src/tests/test_store.cpp
Modified: qpid/branches/asyncstore/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/CMakeLists.txt?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/CMakeLists.txt Mon Sep 24 13:49:13 2012
@@ -1122,7 +1122,12 @@ set (qpidbroker_SOURCES
qpid/broker/FifoDistributor.cpp
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
+ qpid/broker/AsyncResultHandle.cpp
+ qpid/broker/AsyncResultHandleImpl.cpp
+ qpid/broker/AsyncResultQueueImpl.cpp
qpid/broker/Bridge.cpp
+ qpid/broker/ConfigAsyncContext.cpp
+ qpid/broker/ConfigHandle.cpp
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/ConnectionFactory.cpp
@@ -1145,9 +1150,9 @@ set (qpidbroker_SOURCES
qpid/broker/MessageAdapter.cpp
qpid/broker/MessageBuilder.cpp
qpid/broker/MessageHandle.cpp
- qpid/broker/MessageStoreModule.cpp
+# qpid/broker/MessageStoreModule.cpp
qpid/broker/NameGenerator.cpp
- qpid/broker/NullMessageStore.cpp
+# qpid/broker/NullMessageStore.cpp
qpid/broker/QueueBindings.cpp
qpid/broker/QueuedMessage.cpp
qpid/broker/QueueCursor.cpp
@@ -1156,6 +1161,8 @@ set (qpidbroker_SOURCES
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueSettings.cpp
qpid/broker/QueueFlowLimit.cpp
+ qpid/broker/RecoveryAsyncContext.cpp
+ qpid/broker/RecoveryHandle.cpp
qpid/broker/RecoveryManagerImpl.cpp
qpid/broker/RecoveredEnqueue.cpp
qpid/broker/RecoveredDequeue.cpp
@@ -1444,7 +1451,7 @@ add_definitions(-DBOOST_FILESYSTEM_VERSI
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/config.h)
-add_subdirectory(qpid/store)
+#add_subdirectory(qpid/store)
add_subdirectory(tests)
# Support for pkg-config
Modified: qpid/branches/asyncstore/cpp/src/asyncstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/asyncstore.cmake?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/asyncstore.cmake Mon Sep 24 13:49:13 2012
@@ -54,6 +54,7 @@ set (asyncStore_SOURCES
qpid/asyncStore/PersistableMessageContext.cpp
qpid/asyncStore/Plugin.cpp
qpid/asyncStore/QueueHandleImpl.cpp
+ qpid/asyncStore/RecoveryHandleImpl.cpp
qpid/asyncStore/RunState.cpp
qpid/asyncStore/TxnHandleImpl.cpp
qpid/broker/AsyncResultHandle.cpp
@@ -65,6 +66,8 @@ set (asyncStore_SOURCES
qpid/broker/MessageHandle.cpp
qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueHandle.cpp
+ qpid/broker/RecoveryAsyncContext.cpp
+ qpid/broker/RecoveryHandle.cpp
qpid/broker/SimpleDeliverable.cpp
qpid/broker/SimpleDeliveryRecord.cpp
qpid/broker/SimpleMessage.cpp
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.cpp Mon Sep 24 13:49:13 2012
@@ -25,6 +25,7 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/AsyncResultHandleImpl.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/TxnAsyncContext.h"
@@ -132,6 +133,29 @@ AsyncOpTxnAbort::getOpStr() const {
}
+// --- class AsyncOpRecover ---
+
+AsyncOpRecover::AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(rcvrCtxt), store),
+ m_rcvrHandle(rcvrHandle)
+{}
+
+AsyncOpRecover::~AsyncOpRecover() {}
+
+void
+AsyncOpRecover::executeOp() const {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpRecover::getOpStr() const {
+ return "RECOVER";
+}
+
+
// --- class AsyncOpConfigCreate ---
AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncOperation.h Mon Sep 24 13:49:13 2012
@@ -90,6 +90,18 @@ private:
};
+class AsyncOpRecover: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+ qpid::broker::AsyncStore* store);
+ virtual ~AsyncOpRecover();
+ virtual void executeOp() const;
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::RecoveryHandle& m_rcvrHandle;
+};
+
class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp Mon Sep 24 13:49:13 2012
@@ -29,6 +29,7 @@
#include "qpid/asyncStore/EventHandleImpl.h"
#include "qpid/asyncStore/MessageHandleImpl.h"
#include "qpid/asyncStore/QueueHandleImpl.h"
+#include "qpid/asyncStore/RecoveryHandleImpl.h"
#include "qpid/asyncStore/TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/EnqueueHandle.h"
@@ -36,6 +37,8 @@
#include "qpid/broker/MessageHandle.h"
#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueHandle.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
#include "qpid/broker/TxnAsyncContext.h"
#include "qpid/broker/TxnHandle.h"
@@ -48,12 +51,17 @@ AsyncStoreImpl::AsyncStoreImpl(boost::sh
m_opts(opts),
m_runState(),
m_operations(m_poller)
-{}
+{
+ QPID_LOG(info, "AsyncStoreImpl::AsyncStoreImpl()");
+}
AsyncStoreImpl::~AsyncStoreImpl() {}
void
-AsyncStoreImpl::initialize() {}
+AsyncStoreImpl::initialize(bool truncateFlag,
+ bool saveFlag) {
+ QPID_LOG(info, "AsyncStoreImpl::initialize() truncateFlag=" << (truncateFlag?"T":"F") << " saveFlag=" << (saveFlag?"T":"F"));
+}
uint64_t
AsyncStoreImpl::getNextRid() {
@@ -88,25 +96,42 @@ AsyncStoreImpl::createTxnHandle(const st
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
+ m_operations.submit(op);
+}
+
+qpid::broker::RecoveryHandle
+AsyncStoreImpl::createRecoveryHandle() {
+ return qpid::broker::RecoveryHandle(new RecoveryHandleImpl());
+}
+
+void
+AsyncStoreImpl::submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt) {
+ assert(rcvrCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpRecover(rcvrHandle, rcvrCtxt, this));
+ rcvrCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -144,6 +169,7 @@ void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -152,6 +178,7 @@ AsyncStoreImpl::submitCreate(qpid::broke
void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -160,25 +187,28 @@ AsyncStoreImpl::submitDestroy(qpid::brok
void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -187,6 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broke
const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -196,6 +227,7 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -204,28 +236,30 @@ AsyncStoreImpl::submitDestroy(qpid::brok
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
-int
-AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
- qpid::broker::QueueHandle& /*queueHandle*/,
- char* /*data*/,
- uint64_t /*offset*/,
- const uint64_t /*length*/) {
- return 0;
-}
+//int
+//AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
+// qpid::broker::QueueHandle& /*queueHandle*/,
+// char* /*data*/,
+// uint64_t /*offset*/,
+// const uint64_t /*length*/) {
+// return 0;
+//}
}} // namespace qpid::asyncStore
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h Mon Sep 24 13:49:13 2012
@@ -47,7 +47,7 @@ public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
const AsyncStoreOptions& opts);
virtual ~AsyncStoreImpl();
- void initialize();
+ void initialize(bool truncateFlag = false, bool saveFlag = true);
uint64_t getNextRid(); // Global counter for journal RIDs
// --- Management ---
@@ -65,11 +65,17 @@ public:
qpid::broker::SimpleTxnBuffer* tb);
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt);
void submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
void submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
+
+
+ // --- Interface from AsyncRecoverable ---
+ qpid::broker::RecoveryHandle createRecoveryHandle();
+ void submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt);
// --- Interface from AsyncStore ---
@@ -112,12 +118,12 @@ public:
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
- // Legacy - Restore FTD message, is NOT async!
- virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
- qpid::broker::QueueHandle& queueHandle,
- char* data,
- uint64_t offset,
- const uint64_t length);
+// // Legacy - Restore FTD message, is NOT async!
+// virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
+// qpid::broker::QueueHandle& queueHandle,
+// char* data,
+// uint64_t offset,
+// const uint64_t length);
private:
boost::shared_ptr<qpid::sys::Poller> m_poller;
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/ConfigHandleImpl.h Mon Sep 24 13:49:13 2012
@@ -29,8 +29,7 @@
namespace qpid {
namespace asyncStore {
-class ConfigHandleImpl : public virtual qpid::RefCounted
-{
+class ConfigHandleImpl : public virtual qpid::RefCounted {
public:
ConfigHandleImpl();
virtual ~ConfigHandleImpl();
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/OperationQueue.cpp Mon Sep 24 13:49:13 2012
@@ -50,6 +50,8 @@ OperationQueue::OpQueue::Batch::const_it
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) {
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+// DEBUG: kpvdr
+std::cout << "#### OperationQueue::handle(): op=" << (*i)->getOpStr() << std::endl << std::flush;
(*i)->executeOp(); // Do store work here
}
} catch (const std::exception& e) {
Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/Plugin.cpp Mon Sep 24 13:49:13 2012
@@ -43,7 +43,7 @@ Plugin::earlyInitialize(Target& target)
}
m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
- broker->setAsyncStore(brokerAsyncStore);
+ broker->setStore(brokerAsyncStore);
boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
target.addFinalizer(fn);
QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir);
Added: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp Mon Sep 24 13:49:13 2012
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryHandleImpl.cpp
+ */
+
+#include "RecoveryHandleImpl.h"
+
+namespace qpid {
+namespace asyncStore {
+
+RecoveryHandleImpl::RecoveryHandleImpl() {}
+
+RecoveryHandleImpl::~RecoveryHandleImpl() {}
+
+}} // namespace qpid::asyncStore
Added: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h Mon Sep 24 13:49:13 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoverHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_RecoveryHandleImpl_h_
+#define qpid_asyncStore_RecoveryHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class RecoveryHandleImpl: public virtual qpid::RefCounted {
+public:
+ RecoveryHandleImpl();
+ virtual ~RecoveryHandleImpl();
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_RecoveryHandleImpl_h_
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h Mon Sep 24 13:49:13 2012
@@ -20,6 +20,7 @@
#ifndef qpid_broker_AsyncStore_h_
#define qpid_broker_AsyncStore_h_
+#include "qpid/broker/RecoveryManager.h"
#include "qpid/types/Variant.h" // qpid::types::Variant::Map
#include <boost/shared_ptr.hpp>
@@ -65,9 +66,12 @@ class EnqueueHandle;
class EventHandle;
class MessageHandle;
class QueueHandle;
+class RecoveryHandle;
class TxnHandle;
+class InitAsyncContext;
class QueueAsyncContext;
+class RecoveryAsyncContext;
class TpcTxnAsyncContext;
class TxnAsyncContext;
class SimpleTxnBuffer;
@@ -92,10 +96,20 @@ public:
boost::shared_ptr<TxnAsyncContext>) = 0;
};
+class AsyncRecoverable {
+public:
+ virtual ~AsyncRecoverable() {}
+ virtual RecoveryHandle createRecoveryHandle() = 0;
+ virtual void submitRecover(qpid::broker::RecoveryHandle&,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext>) = 0;
+};
+
// Subclassed by store:
-class AsyncStore : public AsyncTransactionalStore {
+class AsyncStore : public AsyncTransactionalStore,
+ public AsyncRecoverable {
public:
virtual ~AsyncStore() {}
+ virtual void initialize(bool truncateFlag = false, bool saveFlag = true) = 0;
// --- Factory methods for creating handles ---
@@ -144,12 +158,12 @@ public:
TxnHandle&,
boost::shared_ptr<QueueAsyncContext>) = 0;
- // Legacy - Restore FTD message, is NOT async!
- virtual int loadContent(MessageHandle&,
- QueueHandle&,
- char* data,
- uint64_t offset,
- const uint64_t length) = 0;
+// // Legacy - Restore FTD message, is NOT async!
+// virtual int loadContent(MessageHandle&,
+// QueueHandle&,
+// char* data,
+// uint64_t offset,
+// const uint64_t length) = 0;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Mon Sep 24 13:49:13 2012
@@ -20,12 +20,16 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/ConfigAsyncContext.h"
+#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/MessageStoreModule.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/MessageStoreModule.h"
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/RecoveryAsyncContext.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/SecureConnectionFactory.h"
@@ -34,6 +38,8 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
@@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& co
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
- store(new NullMessageStore),
+// store(new NullMessageStore),
+// asyncStore(0),
+ asyncResultQueue(poller),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& co
MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
- setStore();
+// if (NullMessageStore::isNullStore(store.get()))
+// setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if (store.get() != 0) {
+// if (store.get() != 0) {
+ if (asyncStore.get() != 0) {
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
+ QPID_LOG(info, "Store recovery starting")
RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
- store->recover(recoverer);
+ RecoveryHandle rh = asyncStore->createRecoveryHandle();
+ boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+ asyncStore->submitRecover(rh, rac);
+// store->recover(recoverer);
}
else {
QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
- store->truncateInit(true); // save old files in subdir
+// store->truncateInit(true); // save old files in subdir
+ asyncStore->initialize(true, true);
}
}
+// debug
+ else QPID_LOG(info, ">>>> No store!!!!")
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& co
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
- bool storeEnabled = store.get() != NULL;
+// bool storeEnabled = store.get() != NULL;
+ bool storeEnabled = asyncStore.get() != NULL;
std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
if (status.second && storeEnabled) {
- store->create(*status.first, framing::FieldTable ());
+// store->create(*status.first, framing::FieldTable ());
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, status.first.get(), bc);
}
}
@@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::cre
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
-{
- store.reset(new MessageStoreModule (_store));
- setStore();
-}
+//void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//{
+// store.reset(new MessageStoreModule (_store));
+// setStore();
+//}
-void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/)
+void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore)
{
- // TODO: Provide implementation for async store interface
+// asyncStore.reset(_asyncStore.get());
+ asyncStore = _asyncStore;
+ setStore();
}
void Broker::setStore () {
- queues.setStore (store.get());
- dtxManager.setStore (store.get());
- links.setStore (store.get());
+// queues.setStore (store.get());
+ queues.setStore(asyncStore.get());
+// dtxManager.setStore (store.get());
+ dtxManager.setStore(asyncStore.get());
+// links.setStore (store.get());
+ links.setStore(asyncStore.get());
+}
+
+// static
+void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+// static
+void Broker::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
+
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue(
return Manageable::STATUS_UNKNOWN_OBJECT;
}
q->query( results );
- return Manageable::STATUS_OK;;
+ return Manageable::STATUS_OK;
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
@@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Br
alternate->incAlternateUsers();
}
if (durable) {
- store->create(*result.first, arguments);
+// store->create(*result.first, arguments);
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, result.first.get(), bc);
}
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::s
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) store->destroy(*exchange);
+// if (exchange->isDurable()) store->destroy(*exchange);
+ if (exchange->isDurable()) {
+// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ }
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& q
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
- store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ // TODO: kpvdr: Async config destroy here
}
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Mon Sep 24 13:49:13 2012
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/broker/AsyncResultQueueImpl.h"
#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
@@ -29,7 +30,7 @@
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
@@ -138,6 +139,8 @@ class Broker : public sys::Runnable, pub
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
+ static void recoverComplete(const AsyncResultHandle* const);
+ static void configureComplete(const AsyncResultHandle* const);
void setLogLevel(const std::string& level);
std::string getLogLevel();
void createObject(const std::string& type, const std::string& name,
@@ -160,7 +163,10 @@ class Broker : public sys::Runnable, pub
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
- std::auto_ptr<MessageStore> store;
+// std::auto_ptr<MessageStore> store;
+// std::auto_ptr<AsyncStore> asyncStore;
+ boost::shared_ptr<AsyncStore> asyncStore;
+ AsyncResultQueueImpl asyncResultQueue;
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
@@ -213,9 +219,10 @@ class Broker : public sys::Runnable, pub
/** Shut down the broker */
QPID_BROKER_EXTERN virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
- void setAsyncStore(boost::shared_ptr<AsyncStore>& asyncStore);
- MessageStore& getStore() { return *store; }
+// QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+ void setStore(boost::shared_ptr<AsyncStore>& asyncStore);
+// MessageStore& getStore() { return *store; }
+ AsyncStore& getStore() { return *asyncStore; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
QueueRegistry& getQueues() { return queues; }
Added: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.cpp Mon Sep 24 13:49:13 2012
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.cpp
+ */
+
+#include "ConfigAsyncContext.h"
+
+namespace qpid {
+namespace broker {
+
+ConfigAsyncContext::ConfigAsyncContext(AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_rcb(rcb),
+ m_arq(arq)
+{}
+
+ConfigAsyncContext::~ConfigAsyncContext() {}
+
+AsyncResultQueue*
+ConfigAsyncContext::getAsyncResultQueue() const {
+ return m_arq;
+}
+
+void
+ConfigAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+}} // namespace qpid
Added: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h?rev=1389378&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigAsyncContext.h Mon Sep 24 13:49:13 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.h
+ */
+
+#ifndef qpid_broker_ConfigAsyncContext_h_
+#define qpid_broker_ConfigAsyncContext_h_
+
+#include "AsyncStore.h"
+
+namespace qpid {
+namespace broker {
+class AsyncResultHandle;
+class AsyncResultQueue;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
+class ConfigAsyncContext: public qpid::broker::BrokerAsyncContext
+{
+public:
+ ConfigAsyncContext(AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ virtual ~ConfigAsyncContext();
+ virtual AsyncResultQueue* getAsyncResultQueue() const;
+ virtual void invokeCallback(const AsyncResultHandle* const) const;
+
+private:
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_ConfigAsyncContext_h_
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Mon Sep 24 13:49:13 2012
@@ -199,3 +199,10 @@ bool DirectExchange::isBound(Queue::shar
DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t DirectExchange::getSize() {
+ return 0;
+}
+void DirectExchange::write(char* /*target*/) {}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.h Mon Sep 24 13:49:13 2012
@@ -65,6 +65,11 @@ public:
QPID_BROKER_EXTERN virtual ~DirectExchange();
virtual bool supportsDynamicBinding() { return true; }
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp Mon Sep 24 13:49:13 2012
@@ -35,7 +35,8 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+//DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {}
DtxManager::~DtxManager() {}
@@ -124,7 +125,8 @@ DtxWorkRecord* DtxManager::createWork(co
throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)"));
} else {
std::string ncxid = xid; // Work around const correctness problems in ptr_map.
- return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+// return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+ return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, asyncTxnStore)).first);
}
}
@@ -172,9 +174,11 @@ void DtxManager::DtxCleanup::fire()
}
}
-void DtxManager::setStore (TransactionalStore* _store)
+//void DtxManager::setStore (TransactionalStore* _store)
+void DtxManager::setStore (AsyncTransactionalStore* _ats)
{
- store = _store;
+// store = _store;
+ asyncTxnStore = _ats;
}
std::string DtxManager::convert(const qpid::framing::Xid& xid)
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h Mon Sep 24 13:49:13 2012
@@ -22,9 +22,10 @@
#define _DtxManager_
#include <boost/ptr_container/ptr_map.hpp>
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/TransactionalStore.h"
+//#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Xid.h"
#include "qpid/sys/Mutex.h"
@@ -46,7 +47,8 @@ class DtxManager{
};
WorkMap work;
- TransactionalStore* store;
+// TransactionalStore* store;
+ AsyncTransactionalStore* asyncTxnStore;
qpid::sys::Mutex lock;
qpid::sys::Timer* timer;
@@ -65,7 +67,8 @@ public:
void setTimeout(const std::string& xid, uint32_t secs);
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
- void setStore(TransactionalStore* store);
+// void setStore(TransactionalStore* store);
+ void setStore(AsyncTransactionalStore* ats);
void setTimer(sys::Timer& t) { timer = &t; }
// Used by cluster for replication.
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp Mon Sep 24 13:49:13 2012
@@ -29,8 +29,8 @@ using qpid::sys::Mutex;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
- xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, AsyncTransactionalStore* const _ats) :
+ xid(_xid), asyncTxnStore(_ats), completed(false), rolledback(false), prepared(false), expired(false) {}
DtxWorkRecord::~DtxWorkRecord()
{
@@ -43,14 +43,15 @@ bool DtxWorkRecord::prepare()
{
Mutex::ScopedLock locker(lock);
if (check()) {
- txn = store->begin(xid);
- if (prepare(txn.get())) {
- store->prepare(*txn);
- prepared = true;
- } else {
- abort();
- //TODO: this should probably be flagged as internal error
- }
+// txn = asyncTxnStore->begin(xid);
+// if (prepare(txn.get())) {
+// asyncTxnStore->prepare(*txn);
+// prepared = true;
+// } else {
+// abort();
+// //TODO: this should probably be flagged as internal error
+// }
+ // TODO: kpvdr: Async transaction prepare here
} else {
//some part of the work has been marked rollback only
abort();
@@ -67,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
return succeeded;
}
-bool DtxWorkRecord::commit(bool onePhase)
+bool DtxWorkRecord::commit(bool onePhase) // why is onePhase necessary if prepared already contains the necessary state?
{
Mutex::ScopedLock locker(lock);
if (check()) {
@@ -77,7 +78,8 @@ bool DtxWorkRecord::commit(bool onePhase
throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!"));
}
- store->commit(*txn);
+// asyncTxnStore->commit(*txn);
+ // TODO: kpvdr: Async transaction commit here
txn.reset();
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
@@ -87,17 +89,20 @@ bool DtxWorkRecord::commit(bool onePhase
if (!onePhase) {
throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!"));
}
- std::auto_ptr<TransactionContext> localtxn = store->begin();
- if (prepare(localtxn.get())) {
- store->commit(*localtxn);
- std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
- return true;
- } else {
- store->abort(*localtxn);
- abort();
- //TODO: this should probably be flagged as internal error
- return false;
- }
+// std::auto_ptr<TransactionContext> localtxn = asyncTxnStore->begin();
+// if (prepare(localtxn.get())) {
+// asyncTxnStore->commit(*localtxn);
+// std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+// return true;
+// } else {
+// asyncTxnStore->abort(*localtxn);
+// abort();
+// //TODO: this should probably be flagged as internal error
+// return false;
+// }
+ // TODO: kpvdr: Local transaction async prepare and commit here
+ // temp return value:
+ return true;
}
} else {
//some part of the work has been marked rollback only
@@ -147,7 +152,8 @@ bool DtxWorkRecord::check()
void DtxWorkRecord::abort()
{
if (txn.get()) {
- store->abort(*txn);
+// asyncTxnStore->abort(*txn);
+ // TODO: kpvdr: Async transaction abore here
txn.reset();
}
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h Mon Sep 24 13:49:13 2012
@@ -21,6 +21,7 @@
#ifndef _DtxWorkRecord_
#define _DtxWorkRecord_
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxTimeout.h"
@@ -48,7 +49,8 @@ class DtxWorkRecord
typedef std::vector<DtxBuffer::shared_ptr> Work;
const std::string xid;
- TransactionalStore* const store;
+// TransactionalStore* const store;
+ AsyncTransactionalStore* const asyncTxnStore;
bool completed;
bool rolledback;
bool prepared;
@@ -63,7 +65,8 @@ class DtxWorkRecord
bool prepare(TransactionContext* txn);
public:
QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid,
- TransactionalStore* const store);
+// TransactionalStore* const store);
+ AsyncTransactionalStore* const store);
QPID_BROKER_EXTERN ~DtxWorkRecord();
QPID_BROKER_EXTERN bool prepare();
QPID_BROKER_EXTERN bool commit(bool onePhase);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Mon Sep 24 13:49:13 2012
@@ -23,10 +23,11 @@
*/
#include <boost/shared_ptr.hpp>
+#include <qpid/broker/AsyncStore.h>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Mutex.h"
@@ -35,13 +36,15 @@
#include "qmf/org/apache/qpid/broker/Binding.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
+#include <set>
+
namespace qpid {
namespace broker {
class Broker;
class ExchangeRegistry;
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable {
public:
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h Mon Sep 24 13:49:13 2012
@@ -24,7 +24,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Mon Sep 24 13:49:13 2012
@@ -120,3 +120,11 @@ bool FanOutExchange::isBound(Queue::shar
FanOutExchange::~FanOutExchange() {}
const std::string FanOutExchange::typeName("fanout");
+
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t FanOutExchange::getSize() {
+ return 0;
+}
+void FanOutExchange::write(char* /*target*/) {}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.h Mon Sep 24 13:49:13 2012
@@ -62,6 +62,11 @@ class FanOutExchange : public virtual Ex
QPID_BROKER_EXTERN virtual ~FanOutExchange();
virtual bool supportsDynamicBinding() { return true; }
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp Mon Sep 24 13:49:13 2012
@@ -418,3 +418,9 @@ bool HeadersExchange::FedUnbindModifier:
return true;
}
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t HeadersExchange::getSize() {
+ return 0;
+}
+void HeadersExchange::write(char* /*target*/) {}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h Mon Sep 24 13:49:13 2012
@@ -107,6 +107,11 @@ class HeadersExchange : public virtual E
static QPID_BROKER_EXTERN bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp Mon Sep 24 13:49:13 2012
@@ -115,6 +115,11 @@ public:
link = _link;
}
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize() { return 0; }
+ void write(char* /*target*/) {}
+
+
private:
Link *link;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp Mon Sep 24 13:49:13 2012
@@ -42,7 +42,8 @@ namespace _qmf = qmf::org::apache::qpid:
// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
broker(0),
- parent(0), store(0), passive(false),
+// parent(0), store(0), passive(false),
+ parent(0), asyncStore(0), passive(false),
realm("")
{
}
@@ -59,7 +60,7 @@ class LinkRegistryConnectionObserver : p
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker),
- parent(0), store(0), passive(false),
+ parent(0), asyncStore(0), passive(false),
realm(broker->getOptions().realm)
{
broker->getConnectionObservers().add(
@@ -117,7 +118,11 @@ pair<Link::shared_ptr, bool> LinkRegistr
boost::bind(&LinkRegistry::linkDestroyed, this, _1),
durable, authMechanism, username, password, broker,
parent, failover));
- if (durable && store) store->create(*link);
+// if (durable && store) store->create(*link);
+ if (durable && asyncStore) {
+// store->create(*link);
+ // TODO: kpvdr: async create config (link)
+ }
links[name] = link;
pendingLinks[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
@@ -213,8 +218,11 @@ pair<Bridge::shared_ptr, bool> LinkRegis
args, init, queueName, altExchange));
bridges[name] = bridge;
link.add(bridge);
- if (durable && store)
- store->create(*bridge);
+// if (durable && store)
+ if (durable && asyncStore) {
+// store->create(*bridge);
+ // TODO: kpvdr: Async create config (bridge)
+ }
QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
"' from " << src << " to " << dest << " (" << key << ")");
@@ -234,8 +242,11 @@ void LinkRegistry::linkDestroyed(Link *l
LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
- if (i->second->isDurable() && store)
- store->destroy(*(i->second));
+// if (i->second->isDurable() && store)
+ if (i->second->isDurable() && asyncStore) {
+// store->destroy(*(i->second));
+ // TODO: kpvdr: Async destroy config (link)
+ }
links.erase(i);
}
}
@@ -254,18 +265,22 @@ void LinkRegistry::destroyBridge(Bridge
if (link) {
link->cancel(b->second);
}
- if (b->second->isDurable())
- store->destroy(*(b->second));
+// if (b->second->isDurable())
+ if (b->second->isDurable()) {
+// store->destroy(*(b->second));
+ // TODO: kpvdr: Async destroy config (bridge)
+ }
bridges.erase(b);
}
-void LinkRegistry::setStore (MessageStore* _store)
-{
- store = _store;
+//void LinkRegistry::setStore (MessageStore* _store)
+void LinkRegistry::setStore (AsyncStore* _asyncStore) {
+ asyncStore = _asyncStore;
}
-MessageStore* LinkRegistry::getStore() const {
- return store;
+//MessageStore* LinkRegistry::getStore() const {
+AsyncStore* LinkRegistry::getStore() const {
+ return asyncStore;
}
namespace {
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h Mon Sep 24 13:49:13 2012
@@ -25,7 +25,7 @@
#include <map>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Bridge.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
@@ -52,7 +52,8 @@ namespace broker {
qpid::sys::Mutex lock;
Broker* broker;
management::Manageable* parent;
- MessageStore* store;
+// MessageStore* store;
+ AsyncStore* asyncStore;
bool passive;
std::string realm;
@@ -130,12 +131,14 @@ namespace broker {
/**
* Set the store to use. May only be called once.
*/
- QPID_BROKER_EXTERN void setStore (MessageStore*);
+// QPID_BROKER_EXTERN void setStore (MessageStore*);
+ QPID_BROKER_EXTERN void setStore (AsyncStore*);
/**
* Return the message store used.
*/
- QPID_BROKER_EXTERN MessageStore* getStore() const;
+// QPID_BROKER_EXTERN MessageStore* getStore() const;
+ QPID_BROKER_EXTERN AsyncStore* getStore() const;
QPID_BROKER_EXTERN std::string getAuthMechanism (const std::string& key);
QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.cpp Mon Sep 24 13:49:13 2012
@@ -33,8 +33,10 @@ bool isLowerPriorityThan(uint8_t priorit
}
}
-LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
- : Queue(n, s, ms, p, b) {}
+//LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+// : Queue(n, s, ms, p, b) {}
+LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+ : Queue(n, s, as, p, b) {}
bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LossyQueue.h Mon Sep 24 13:49:13 2012
@@ -32,7 +32,8 @@ namespace broker {
class LossyQueue : public Queue
{
public:
- LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+// LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+ LossyQueue(const std::string&, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
bool checkDepth(const QueueDepth& increment, const Message&);
private:
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.cpp Mon Sep 24 13:49:13 2012
@@ -25,8 +25,10 @@
namespace qpid {
namespace broker {
-Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
- : Queue(n, s, ms, p, b), messageMap(*m)
+//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+// : Queue(n, s, ms, p, b), messageMap(*m)
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+ : Queue(n, s, as, p, b), messageMap(*m)
{
messages = m;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Lvq.h Mon Sep 24 13:49:13 2012
@@ -35,7 +35,8 @@ class MessageMap;
class Lvq : public Queue
{
public:
- Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+// Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+ Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
void push(Message& msg, bool isRecovery=false);
private:
MessageMap& messageMap;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStore.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprecated in favor of AsyncStore"
+
#ifndef _MessageStore_
#define _MessageStore_
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.cpp Mon Sep 24 13:49:13 2012
@@ -19,6 +19,8 @@
*
*/
+#error "deprecated in favor of AsyncStore"
+
#include "qpid/broker/MessageStoreModule.h"
#include "qpid/broker/NullMessageStore.h"
#include <iostream>
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageStoreModule.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprecated in favor of AsyncStore"
+
#ifndef _MessageStoreModule_
#define _MessageStoreModule_
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.cpp Mon Sep 24 13:49:13 2012
@@ -19,6 +19,8 @@
*
*/
+#error "deprecated in favor of AsyncStore"
+
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/MessageStoreModule.h"
#include "qpid/broker/RecoveryManager.h"
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NullMessageStore.h Mon Sep 24 13:49:13 2012
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprected in favor of AsyncStore"
+
#ifndef _NullMessageStore_
#define _NullMessageStore_
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.cpp Mon Sep 24 13:49:13 2012
@@ -64,19 +64,19 @@ void PersistableMessage::flush()
//TODO: is this really the right place for this?
}
-// deprecated
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
-{
- enqueueStart();
-}
+//// deprecated
+//void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+//{
+// enqueueStart();
+//}
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*)
{
enqueueStart();
}
-// deprecated
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
+//// deprecated
+//void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PersistableMessage.h Mon Sep 24 13:49:13 2012
@@ -80,16 +80,16 @@ class PersistableMessage : public Persis
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
- MessageStore* _store);
+// QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
+// MessageStore* _store);
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
AsyncStore* _store);
QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
- MessageStore* _store);
+// QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
+// MessageStore* _store);
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
AsyncStore* _store);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp Mon Sep 24 13:49:13 2012
@@ -26,11 +26,11 @@
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw(
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
- MessageStore* const _store,
+// MessageStore* const _store,
+ AsyncStore* const _asyncStore,
Manageable* parent,
Broker* b) :
name(_name),
- store(_store),
+// store(_store),
+ asyncStore(_asyncStore),
owner(0),
consumerCount(0),
browserCount(0),
@@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete);
mgmtObject->set_arguments(settings.asMap());
- agent->addObject(mgmtObject, 0, store != 0);
+// agent->addObject(mgmtObject, 0, store != 0);
+ agent->addObject(mgmtObject, 0, asyncStore != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
@@ -787,7 +791,7 @@ void Queue::setLastNodeFailure()
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
*/
-bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
+bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext*
msg.addTraceId(settings.traceId);
}
- if (msg.isPersistent() && store) {
+// if (msg.isPersistent() && store) {
+ if (msg.isPersistent() && asyncStore) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
- pmsg->enqueueAsync(shared_from_this(), store);
- store->enqueue(ctxt, pmsg, *this);
+// pmsg->enqueueAsync(shared_from_this(), store);
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+// store->enqueue(ctxt, pmsg, *this);
+ // TODO - kpvdr: async enqueue here
}
return true;
}
@@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Messag
void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
{
ScopedUse u(barrier);
- if (u.acquired && msg && store) {
- store->dequeue(0, msg, *this);
+// if (u.acquired && msg && store) {
+ if (u.acquired && msg && asyncStore) {
+// store->dequeue(0, msg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext*
return;
}
}
- if (store && pmsg) {
- store->dequeue(ctxt, pmsg, *this);
+// if (store && pmsg) {
+ if (asyncStore && pmsg) {
+// store->dequeue(ctxt, pmsg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const
void Queue::create()
{
- if (store) {
- store->create(*this, settings.storeSettings);
+// if (store) {
+ if (asyncStore) {
+// store->create(*this, settings.storeSettings);
+ // TODO: kpvdr: async store create here
}
}
@@ -1051,11 +1064,16 @@ void Queue::destroyed()
alternateExchange->decAlternateUsers();
}
- if (store) {
+// if (store) {
+ if (asyncStore) {
barrier.destroy();
- store->flush(*this);
- store->destroy(*this);
- store = 0;//ensure we make no more calls to the store for this queue
+// store->flush(*this);
+ // TODO: kpvdr: async flush here
+// store->destroy(*this);
+ // TODO: kpvdr: async destroy here
+// store = 0;//ensure we make no more calls to the store for this queue
+ // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
+ // will cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared
void Queue::flush()
{
ScopedUse u(barrier);
- if (u.acquired && store) store->flush(*this);
+// if (u.acquired && store) store->flush(*this);
+ // TODO: kpvdr: Async store flush here
+ if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
}
@@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Excha
if (exchange->bind(shared_from_this(), key, &arguments)) {
bound(exchange->getName(), key, arguments);
if (exchange->isDurable() && isDurable()) {
- store->bind(*exchange, *this, key, arguments);
+// store->bind(*exchange, *this, key, arguments);
+ // TODO: kpvdr: Store configuration here
}
return true;
} else {
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h?rev=1389378&r1=1389377&r2=1389378&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h Mon Sep 24 13:49:13 2012
@@ -59,7 +59,7 @@ namespace qpid {
namespace broker {
class Broker;
class Exchange;
-class MessageStore;
+//class MessageStore;
class QueueDepth;
class QueueEvents;
class QueueRegistry;
@@ -115,7 +115,8 @@ class Queue : public boost::enable_share
typedef boost::function1<void, Message&> MessageFunctor;
const std::string name;
- MessageStore* store;
+// MessageStore* store;
+ AsyncStore* asyncStore;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
@@ -201,7 +202,8 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN Queue(const std::string& name,
const QueueSettings& settings = QueueSettings(),
- MessageStore* const store = 0,
+// MessageStore* const store = 0,
+ AsyncStore* const asyncStore = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
QPID_BROKER_EXTERN virtual ~Queue();
@@ -286,7 +288,8 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
- inline bool isDurable() const { return store != 0; }
+// inline bool isDurable() const { return store != 0; }
+ inline bool isDurable() const { return asyncStore != 0; }
inline const QueueSettings& getSettings() const { return settings; }
inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
inline bool isAutoDelete() const { return settings.autodelete; }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org