You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/12/04 22:54:04 UTC

svn commit: r601099 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp Broker.h MessageStore.h MessageStoreModule.cpp MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h

Author: kpvdr
Date: Tue Dec  4 13:54:03 2007
New Revision: 601099

URL: http://svn.apache.org/viewvc?rev=601099&view=rev
Log:
Added options to broker for journal file size. Also brought back exception copying in MessageStoreModue to prevent exceptions thrown in the store lib causing cores when handled in qpidd.cpp.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Dec  4 13:54:03 2007
@@ -72,6 +72,8 @@
     storeDir("/var"),
     storeAsync(false),
     storeForce(false),
+    numJrnlFiles(8),
+    jrnlFsizePgs(24),
     enableMgmt(0),
     mgmtPubInterval(10),
     ack(0)
@@ -89,14 +91,20 @@
          "Sets the connection backlog limit for the server socket")
         ("staging-threshold", optValue(stagingThreshold, "N"),
          "Stages messages over N bytes to disk")
+// TODO: These options need to come from within the store module
         ("store,s", optValue(store,"LIBNAME"),
          "Tells the broker to use the message store shared library LIBNAME for persistence")
         ("store-directory", optValue(storeDir,"DIR"),
          "Store directory location for persistence.")
         ("store-async", optValue(storeAsync,"yes|no"),
-         "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
+         "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
         ("store-force", optValue(storeForce,"yes|no"),
-         "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this")
+         "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
+        ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
+         "Number of files in persistence journal")
+        ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+         "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
+// End of store module options
         ("mgmt,m", optValue(enableMgmt,"yes|no"),
          "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -152,7 +160,7 @@
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     
     if(store.get()) {
-        if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){
+        if (!store->init(&conf)){
               throw Exception( "Existing Journal in different mode, backup/move existing data \
 			  before changing modes. Or use --store-force yes to blow existing data away.");
 		}else{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Dec  4 13:54:03 2007
@@ -71,6 +71,8 @@
         string storeDir;
         bool storeAsync;
         bool storeForce;
+        u_int16_t numJrnlFiles;
+        u_int32_t jrnlFsizePgs;
         bool enableMgmt;
         uint16_t mgmtPubInterval;
         uint32_t ack;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Dec  4 13:54:03 2007
@@ -22,6 +22,7 @@
 #define _MessageStore_
 
 #include <boost/shared_ptr.hpp>
+#include <qpid/Options.h>
 #include "PersistableExchange.h"
 #include "PersistableMessage.h"
 #include "PersistableQueue.h"
@@ -47,7 +48,7 @@
      * @param async true, enable async, false, enable sync
      * @param force true, delete data on mode change, false, error on mode change
 	 */
-	virtual bool init(const std::string& dir, const bool async, const bool force) = 0;
+	virtual bool init(const Options* options) = 0;
 
     /**
      * Record the existence of a durable queue

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Tue Dec  4 13:54:03 2007
@@ -22,121 +22,124 @@
 #include "MessageStoreModule.h"
 #include <iostream>
 
+// This transfer protects against the unloading of the store lib prior to the handling of the exception
+#define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); }
+
 using namespace qpid::broker;
 
 MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
 {
 }
 
-bool MessageStoreModule::init(const std::string& dir, const bool async, const bool force)
+bool MessageStoreModule::init(const Options* options)
 {
-	return store->init(dir, async, force);
+	TRANSFER_EXCEPTION(return store->init(options));
 }
 
 void MessageStoreModule::create(PersistableQueue& queue)
 {
-    store->create(queue);
+    TRANSFER_EXCEPTION(store->create(queue));
 }
 
 void MessageStoreModule::destroy(PersistableQueue& queue)
 {
-    store->destroy(queue);
+    TRANSFER_EXCEPTION(store->destroy(queue));
 }
 
 void MessageStoreModule::create(const PersistableExchange& exchange)
 {
-    store->create(exchange);
+    TRANSFER_EXCEPTION(store->create(exchange));
 }
 
 void MessageStoreModule::destroy(const PersistableExchange& exchange)
 {
-    store->destroy(exchange);
+    TRANSFER_EXCEPTION(store->destroy(exchange));
 }
 
 void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, 
                               const std::string& k, const framing::FieldTable& a)
 {
-    store->bind(e, q, k, a);
+    TRANSFER_EXCEPTION(store->bind(e, q, k, a));
 }
 
 void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, 
                                 const std::string& k, const framing::FieldTable& a)
 {
-    store->unbind(e, q, k, a);
+    TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
 }
 
 void MessageStoreModule::recover(RecoveryManager& registry)
 {
-    store->recover(registry);
+    TRANSFER_EXCEPTION(store->recover(registry));
 }
 
 void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
 {
-    store->stage(msg);
+    TRANSFER_EXCEPTION(store->stage(msg));
 }
 
 void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
 {
-    store->destroy(msg);
+    TRANSFER_EXCEPTION(store->destroy(msg));
 }
 
 void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
 {
-    store->appendContent(msg, data);
+    TRANSFER_EXCEPTION(store->appendContent(msg, data));
 }
 
 void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, 
      intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length)
 {
-    store->loadContent(queue, msg, data, offset, length);
+    TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length));
 }
 
 void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
 {
-    store->enqueue(ctxt, msg, queue);
+    TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue));
 }
 
 void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
 {
-    store->dequeue(ctxt, msg, queue);
+    TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
 }
 
 void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue)
 {
-    store->flush(queue);
+    TRANSFER_EXCEPTION(store->flush(queue));
 }
 
 u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue)
 {
-    return store->outstandingQueueAIO(queue);
+    TRANSFER_EXCEPTION(return store->outstandingQueueAIO(queue));
 }
 
 std::auto_ptr<TransactionContext> MessageStoreModule::begin()
 {
-    return store->begin();
+    TRANSFER_EXCEPTION(return store->begin());
 }
 
 std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
 {
-    return store->begin(xid);
+    TRANSFER_EXCEPTION(return store->begin(xid));
 }
 
 void MessageStoreModule::prepare(TPCTransactionContext& txn)
 {
-    store->prepare(txn);
+    TRANSFER_EXCEPTION(store->prepare(txn));
 }
 
 void MessageStoreModule::commit(TransactionContext& ctxt)
 {
-    store->commit(ctxt);
+    TRANSFER_EXCEPTION(store->commit(ctxt));
 }
 
 void MessageStoreModule::abort(TransactionContext& ctxt)
 {
-    store->abort(ctxt);
+    TRANSFER_EXCEPTION(store->abort(ctxt));
 }
 
 void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids)
 {
-    store->collectPreparedXids(xids);
+    TRANSFER_EXCEPTION(store->collectPreparedXids(xids));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Tue Dec  4 13:54:03 2007
@@ -38,7 +38,7 @@
 public:
     MessageStoreModule(const std::string& name);
 
-	bool init(const std::string& dir, const bool async, const bool force);
+	bool init(const Options* options);
     std::auto_ptr<TransactionContext> begin();
     std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     void prepare(TPCTransactionContext& txn);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Dec  4 13:54:03 2007
@@ -49,7 +49,7 @@
 
 NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
 
-bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, const bool /*force*/) {return true;}
+bool NullMessageStore::init(const Options* /*options*/) {return true;}
 
 void NullMessageStore::create(PersistableQueue& queue)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=601099&r1=601098&r2=601099&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Dec  4 13:54:03 2007
@@ -38,7 +38,7 @@
 public:
     NullMessageStore(bool warn = false);
 
-    virtual bool init(const std::string& dir, const bool async, const bool force);
+    virtual bool init(const Options* options);
     virtual std::auto_ptr<TransactionContext> begin();
     virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     virtual void prepare(TPCTransactionContext& txn);