You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/12/04 22:54:04 UTC
svn commit: r601099 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker:
Broker.cpp Broker.h MessageStore.h MessageStoreModule.cpp
MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h
Author: kpvdr
Date: Tue Dec 4 13:54:03 2007
New Revision: 601099
URL: http://svn.apache.org/viewvc?rev=601099&view=rev
Log:
Added options to broker for journal file size. Also brought back exception copying in MessageStoreModue to prevent exceptions thrown in the store lib causing cores when handled in qpidd.cpp.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Dec 4 13:54:03 2007
@@ -72,6 +72,8 @@
storeDir("/var"),
storeAsync(false),
storeForce(false),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
enableMgmt(0),
mgmtPubInterval(10),
ack(0)
@@ -89,14 +91,20 @@
"Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"),
"Stages messages over N bytes to disk")
+// TODO: These options need to come from within the store module
("store,s", optValue(store,"LIBNAME"),
"Tells the broker to use the message store shared library LIBNAME for persistence")
("store-directory", optValue(storeDir,"DIR"),
"Store directory location for persistence.")
("store-async", optValue(storeAsync,"yes|no"),
- "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
+ "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
("store-force", optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this")
+ "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
+ ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
+ "Number of files in persistence journal")
+ ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+ "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
+// End of store module options
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -152,7 +160,7 @@
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if(store.get()) {
- if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){
+ if (!store->init(&conf)){
throw Exception( "Existing Journal in different mode, backup/move existing data \
before changing modes. Or use --store-force yes to blow existing data away.");
}else{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Dec 4 13:54:03 2007
@@ -71,6 +71,8 @@
string storeDir;
bool storeAsync;
bool storeForce;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
bool enableMgmt;
uint16_t mgmtPubInterval;
uint32_t ack;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Dec 4 13:54:03 2007
@@ -22,6 +22,7 @@
#define _MessageStore_
#include <boost/shared_ptr.hpp>
+#include <qpid/Options.h>
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
@@ -47,7 +48,7 @@
* @param async true, enable async, false, enable sync
* @param force true, delete data on mode change, false, error on mode change
*/
- virtual bool init(const std::string& dir, const bool async, const bool force) = 0;
+ virtual bool init(const Options* options) = 0;
/**
* Record the existence of a durable queue
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Tue Dec 4 13:54:03 2007
@@ -22,121 +22,124 @@
#include "MessageStoreModule.h"
#include <iostream>
+// This transfer protects against the unloading of the store lib prior to the handling of the exception
+#define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); }
+
using namespace qpid::broker;
MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
{
}
-bool MessageStoreModule::init(const std::string& dir, const bool async, const bool force)
+bool MessageStoreModule::init(const Options* options)
{
- return store->init(dir, async, force);
+ TRANSFER_EXCEPTION(return store->init(options));
}
void MessageStoreModule::create(PersistableQueue& queue)
{
- store->create(queue);
+ TRANSFER_EXCEPTION(store->create(queue));
}
void MessageStoreModule::destroy(PersistableQueue& queue)
{
- store->destroy(queue);
+ TRANSFER_EXCEPTION(store->destroy(queue));
}
void MessageStoreModule::create(const PersistableExchange& exchange)
{
- store->create(exchange);
+ TRANSFER_EXCEPTION(store->create(exchange));
}
void MessageStoreModule::destroy(const PersistableExchange& exchange)
{
- store->destroy(exchange);
+ TRANSFER_EXCEPTION(store->destroy(exchange));
}
void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const framing::FieldTable& a)
{
- store->bind(e, q, k, a);
+ TRANSFER_EXCEPTION(store->bind(e, q, k, a));
}
void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const framing::FieldTable& a)
{
- store->unbind(e, q, k, a);
+ TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
}
void MessageStoreModule::recover(RecoveryManager& registry)
{
- store->recover(registry);
+ TRANSFER_EXCEPTION(store->recover(registry));
}
void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
{
- store->stage(msg);
+ TRANSFER_EXCEPTION(store->stage(msg));
}
void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
{
- store->destroy(msg);
+ TRANSFER_EXCEPTION(store->destroy(msg));
}
void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
- store->appendContent(msg, data);
+ TRANSFER_EXCEPTION(store->appendContent(msg, data));
}
void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue,
intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length)
{
- store->loadContent(queue, msg, data, offset, length);
+ TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length));
}
void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- store->enqueue(ctxt, msg, queue);
+ TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue));
}
void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- store->dequeue(ctxt, msg, queue);
+ TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
}
void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue)
{
- store->flush(queue);
+ TRANSFER_EXCEPTION(store->flush(queue));
}
u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue)
{
- return store->outstandingQueueAIO(queue);
+ TRANSFER_EXCEPTION(return store->outstandingQueueAIO(queue));
}
std::auto_ptr<TransactionContext> MessageStoreModule::begin()
{
- return store->begin();
+ TRANSFER_EXCEPTION(return store->begin());
}
std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
{
- return store->begin(xid);
+ TRANSFER_EXCEPTION(return store->begin(xid));
}
void MessageStoreModule::prepare(TPCTransactionContext& txn)
{
- store->prepare(txn);
+ TRANSFER_EXCEPTION(store->prepare(txn));
}
void MessageStoreModule::commit(TransactionContext& ctxt)
{
- store->commit(ctxt);
+ TRANSFER_EXCEPTION(store->commit(ctxt));
}
void MessageStoreModule::abort(TransactionContext& ctxt)
{
- store->abort(ctxt);
+ TRANSFER_EXCEPTION(store->abort(ctxt));
}
void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids)
{
- store->collectPreparedXids(xids);
+ TRANSFER_EXCEPTION(store->collectPreparedXids(xids));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Tue Dec 4 13:54:03 2007
@@ -38,7 +38,7 @@
public:
MessageStoreModule(const std::string& name);
- bool init(const std::string& dir, const bool async, const bool force);
+ bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
void prepare(TPCTransactionContext& txn);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Dec 4 13:54:03 2007
@@ -49,7 +49,7 @@
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, const bool /*force*/) {return true;}
+bool NullMessageStore::init(const Options* /*options*/) {return true;}
void NullMessageStore::create(PersistableQueue& queue)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Dec 4 13:54:03 2007
@@ -38,7 +38,7 @@
public:
NullMessageStore(bool warn = false);
- virtual bool init(const std::string& dir, const bool async, const bool force);
+ virtual bool init(const Options* options);
virtual std::auto_ptr<TransactionContext> begin();
virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(TPCTransactionContext& txn);