You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/06 13:01:42 UTC

svn commit: r483046 - in /incubator/qpid/trunk/qpid/cpp: lib/broker/ tests/

Author: gsim
Date: Wed Dec  6 04:01:40 2006
New Revision: 483046

URL: http://svn.apache.org/viewvc?view=rev&rev=483046
Log:
Added new configuration option for staging threshold (size above which messages 
will be written to disk as content arrives rather than accumulating that content 
in memory). Pass this through to all channels and to the store on recovery.


Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp Wed Dec  6 04:01:40 2006
@@ -31,7 +31,7 @@
 using namespace qpid::sys;
 
 
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
     id(_id), 
     out(_out), 
     currentDeliveryTag(1),
@@ -40,8 +40,8 @@
     prefetchCount(0),
     framesize(_framesize),
     tagGenerator("sgen"),
-    store(0),
-    messageBuilder(this){
+    store(_store),
+    messageBuilder(this, _store, _stagingThreshold){
 
     outstanding.reset();
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h Wed Dec  6 04:01:40 2006
@@ -36,7 +36,7 @@
 #include <NameGenerator.h>
 #include <Prefetch.h>
 #include <BrokerQueue.h>
-#include <TransactionalStore.h>
+#include <MessageStore.h>
 #include <TxAck.h>
 #include <TxBuffer.h>
 #include <TxPublish.h>
@@ -85,7 +85,7 @@
             qpid::sys::Mutex deliveryLock;
             TxBuffer txBuffer;
             AccumulatedAck accumulatedAck;
-            TransactionalStore* store;
+            MessageStore* const store;
             MessageBuilder messageBuilder;//builder for in-progress message
             Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
 
@@ -95,7 +95,8 @@
             bool checkPrefetch(Message::shared_ptr& msg);
         
         public:
-            Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+            Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize, 
+                    MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
             ~Channel();
             inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
             inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp Wed Dec  6 04:01:40 2006
@@ -32,6 +32,7 @@
     maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500),
     connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10),
     store('s', "store", "Set the message store module to use (default='' which implies no store)", ""),
+    stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000),
     help("help", "Print usage information", false),
     version("version", "Print version information", false)
 {
@@ -41,6 +42,7 @@
     options.push_back(&maxConnections);
     options.push_back(&connectionBacklog);
     options.push_back(&store);
+    options.push_back(&stagingThreshold);
     options.push_back(&help);
     options.push_back(&version);
 }
@@ -106,6 +108,11 @@
     return store.getValue();
 }
 
+long Configuration::getStagingThreshold() const {
+    return stagingThreshold.getValue();
+}
+
+
 Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
     flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
 
@@ -190,6 +197,28 @@
 
 void Configuration::IntOption::setValue(const std::string& _value){
     value = atoi(_value.c_str());
+}
+
+// Long Option:
+
+Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) :
+    Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) :
+    Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::~LongOption(){}
+
+long Configuration::LongOption::getValue() const {
+    return value;
+}
+
+bool Configuration::LongOption::needsValue() const {
+    return true;
+}
+
+void Configuration::LongOption::setValue(const std::string& _value){
+    value = atol(_value.c_str());
 }
 
 // Bool Option:

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h Wed Dec  6 04:01:40 2006
@@ -63,6 +63,20 @@
                 virtual void setValue(int _value) { value = _value; }
             };
 
+            class LongOption : public Option{
+                const long defaultValue;
+                int value;
+            public:
+                LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0);
+                LongOption(const std::string& name, const std::string& desc, const long value = 0);
+                virtual ~LongOption();
+
+                long getValue() const;
+                virtual bool needsValue() const;
+                virtual void setValue(const std::string& value);
+                virtual void setValue(int _value) { value = _value; }
+            };
+
             class StringOption : public Option{
                 const std::string defaultValue;
                 std::string value;
@@ -96,6 +110,7 @@
             IntOption maxConnections;
             IntOption connectionBacklog;
             StringOption store;
+            LongOption stagingThreshold;
             BoolOption help;
             BoolOption version;
             char const *programName;
@@ -123,6 +138,7 @@
             int getMaxConnections() const;
             int getConnectionBacklog() const;
             const std::string& getStore() const;
+            long getStagingThreshold() const;
 
             void setHelp(bool b) { help.setValue(b); }
             void setVersion(bool b) { version.setValue(b); }
@@ -132,6 +148,7 @@
             void setMaxConnections(int i) { maxConnections.setValue(i); }
             void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
             void setStore(const std::string& s) { store.setValue(s); }
+            void setStagingThreshold(long l) { stagingThreshold.setValue(l); }
 
             void usage();
         };

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp Wed Dec  6 04:01:40 2006
@@ -73,3 +73,7 @@
     } while(queues.find(name) != queues.end());
     return name;
 }
+
+MessageStore* const QueueRegistry::getStore() const {
+    return store;
+}

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h Wed Dec  6 04:01:40 2006
@@ -74,6 +74,12 @@
      */
     string generateName();
 
+    /**
+     * Return the message store used.
+     */
+    MessageStore* const getStore() const;
+
+
   private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Wed Dec  6 04:01:40 2006
@@ -39,9 +39,9 @@
 const std::string amq_match("amq.match");
 }
 
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : 
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : 
     store(_store.empty() ? (MessageStore*)  new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), 
-    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+    queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
 {
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);
@@ -51,7 +51,8 @@
 
     if(store.get()) {
         RecoveryManager recoverer(queues, exchanges);
-        store->recover(recoverer);
+        MessageStoreSettings storeSettings = { settings.stagingThreshold };
+        store->recover(recoverer, &storeSettings);
     }
 
     cleaner.start();
@@ -59,7 +60,7 @@
 
 SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
 {
-    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
 }
 
 SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h Wed Dec  6 04:01:40 2006
@@ -31,6 +31,7 @@
 #include <sys/SessionHandler.h>
 #include <sys/SessionHandlerFactory.h>
 #include <sys/TimeoutHandler.h>
+#include <SessionHandlerImpl.h>
 #include <memory>
 
 namespace qpid {
@@ -41,10 +42,10 @@
             std::auto_ptr<MessageStore> store;
             QueueRegistry queues;
             ExchangeRegistry exchanges;
-            const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+            const Settings settings;
             AutoDelete cleaner;
         public:
-            SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000);
+            SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
             virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
             virtual ~SessionHandlerFactoryImpl();
         };

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Wed Dec  6 04:01:40 2006
@@ -35,7 +35,7 @@
                                        QueueRegistry* _queues, 
                                        ExchangeRegistry* _exchanges, 
                                        AutoDelete* _cleaner,
-                                       const u_int32_t _timeout) :
+                                       const Settings& _settings) :
     context(_context), 
 // AMQP version management change - kpvdr 2006-11-17
 // TODO: Make this class version-aware and link these hard-wired numbers to that version
@@ -43,7 +43,7 @@
     queues(_queues), 
     exchanges(_exchanges),
     cleaner(_cleaner),
-    timeout(_timeout),
+    settings(_settings),
     basicHandler(new BasicHandlerImpl(this)),
     channelHandler(new ChannelHandlerImpl(this)),
     connectionHandler(new ConnectionHandlerImpl(this)),
@@ -200,7 +200,8 @@
 
 
 void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
-    parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+    parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, 
+                                            parent->queues->getStore(), parent->settings.stagingThreshold);
     parent->client.getChannel().openOk(channel);
 } 
         
@@ -262,7 +263,7 @@
 	queue = parent->getQueue(name, channel);
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
-            parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+            parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Wed Dec  6 04:01:40 2006
@@ -60,6 +60,14 @@
     const char* what() const throw() { return text.c_str(); }
 };
 
+class Settings {
+public:
+    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+    const u_int64_t stagingThreshold;
+
+    Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
+};
+
 class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, 
                            public virtual qpid::framing::AMQP_ServerOperations, 
                            public virtual ConnectionToken
@@ -72,7 +80,7 @@
     QueueRegistry* queues;
     ExchangeRegistry* const exchanges;
     AutoDelete* const cleaner;
-    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+    const Settings settings;
 
     std::auto_ptr<BasicHandler> basicHandler;
     std::auto_ptr<ChannelHandler> channelHandler;
@@ -104,7 +112,7 @@
     
   public:
     SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, 
-                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
     virtual void received(qpid::framing::AMQFrame* frame);
     virtual void initiated(qpid::framing::ProtocolInitiation* header);
     virtual void idleOut();

Modified: incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp Wed Dec  6 04:01:40 2006
@@ -32,6 +32,7 @@
     CPPUNIT_TEST(testPortLongForm);
     CPPUNIT_TEST(testPortShortForm);
     CPPUNIT_TEST(testStore);
+    CPPUNIT_TEST(testStagingThreshold);
     CPPUNIT_TEST(testVarious);
     CPPUNIT_TEST_SUITE_END();
 
@@ -68,6 +69,15 @@
         conf.parse("ignore", 3, argv);
         std::string expected("my-store-module.so");
         CPPUNIT_ASSERT_EQUAL(expected, conf.getStore());
+    }
+
+    void testStagingThreshold() 
+    {
+        Configuration conf;
+        char* argv[] = {"ignore", "--staging-threshold", "123456789"};
+        conf.parse("ignore", 3, argv);
+        long expected = 123456789;
+        CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold());
     }
 
     void testVarious()