You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2010/04/15 17:57:46 UTC

svn commit: r934463 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: kpvdr
Date: Thu Apr 15 15:57:46 2010
New Revision: 934463

URL: http://svn.apache.org/viewvc?rev=934463&view=rev
Log:
Implementation of QPID-2509 (Remove message staging from C++ broker)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Apr 15 15:57:46 2010
@@ -81,7 +81,6 @@ Broker::Options::Options(const std::stri
     workerThreads(5),
     maxConnections(500),
     connectionBacklog(10),
-    stagingThreshold(5000000),
     enableMgmt(1),
     mgmtPubInterval(10),
     queueCleanInterval(60*10),//10 minutes
@@ -113,7 +112,6 @@ Broker::Options::Options(const std::stri
         ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
         ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
-        ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
@@ -178,7 +176,6 @@ Broker::Broker(const Broker::Options& co
         mgmtObject->set_workerThreads(conf.workerThreads);
         mgmtObject->set_maxConns(conf.maxConnections);
         mgmtObject->set_connBacklog(conf.connectionBacklog);
-        mgmtObject->set_stagingThreshold(conf.stagingThreshold);
         mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
         mgmtObject->set_version(qpid::version);
         if (dataDir.isEnabled())
@@ -223,8 +220,7 @@ Broker::Broker(const Broker::Options& co
         // The cluster plug-in will setRecovery(false) on all but the first
         // broker to join a cluster.
         if (getRecovery()) {
-            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
-                                          conf.stagingThreshold);
+            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
             store->recover(recoverer);
         }
         else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Apr 15 15:57:46 2010
@@ -99,7 +99,6 @@ public:
         int workerThreads;
         int maxConnections;
         int connectionBacklog;
-        uint64_t stagingThreshold;
         bool enableMgmt;
         uint16_t mgmtPubInterval;
         uint16_t queueCleanInterval;
@@ -205,7 +204,6 @@ public:
     QueueRegistry& getQueues() { return queues; }
     ExchangeRegistry& getExchanges() { return exchanges; }
     LinkRegistry& getLinks() { return links; }
-    uint64_t getStagingThreshold() { return config.stagingThreshold; }
     DtxManager& getDtxManager() { return dtxManager; }
     DataDir& getDataDir() { return dataDir; }
     Options& getOptions() { return config; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Apr 15 15:57:46 2010
@@ -46,7 +46,6 @@ class ConnectionState : public Connectio
         framemax(65535),
         heartbeat(0),
         heartbeatmax(120),
-        stagingThreshold(broker.getStagingThreshold()),
         federationLink(true),
         clientSupportsThrottling(false),
         clusterOrderOut(0)
@@ -57,12 +56,10 @@ class ConnectionState : public Connectio
     uint32_t getFrameMax() const { return framemax; }
     uint16_t getHeartbeat() const { return heartbeat; }
     uint16_t getHeartbeatMax() const { return heartbeatmax; }
-    uint64_t getStagingThreshold() const { return stagingThreshold; }
 
     void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
-    void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
 
     virtual void setUserId(const string& uid) {  userId = uid; }
     const string& getUserId() const { return userId; }
@@ -107,7 +104,6 @@ class ConnectionState : public Connectio
     uint32_t framemax;
     uint16_t heartbeat;
     uint16_t heartbeatmax;
-    uint64_t stagingThreshold;
     string userId;
     string url;
     bool federationLink;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Apr 15 15:57:46 2010
@@ -36,8 +36,8 @@ namespace
     const std::string QPID_MANAGEMENT("qpid.management");
 }
 
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
-    state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
+MessageBuilder::MessageBuilder(MessageStore* const _store) :
+    state(DORMANT), store(_store) {}
 
 void MessageBuilder::handle(AMQFrame& frame)
 {
@@ -68,29 +68,13 @@ void MessageBuilder::handle(AMQFrame& fr
     default:
         throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
     }
-    if (staging) {
-        intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message);
-        store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData());
-    } else {
-        message->getFrames().append(frame);
-        //have we reached the staging limit? if so stage message and release content
-        if (state == CONTENT
-            && stagingThreshold
-            && message->getFrames().getContentSize() >= stagingThreshold
-            && !NullMessageStore::isNullStore(store)
-            && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
-        {
-            message->releaseContent();
-            staging = true;
-        }
-    }
+    message->getFrames().append(frame);
 }
 
 void MessageBuilder::end()
 {
     message = 0;
     state = DORMANT;
-    staging = false;
 }
 
 void MessageBuilder::start(const SequenceNumber& id)
@@ -98,7 +82,6 @@ void MessageBuilder::start(const Sequenc
     message = intrusive_ptr<Message>(new Message(id));
     message->setStore(store);
     state = METHOD;
-    staging = false;
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Thu Apr 15 15:57:46 2010
@@ -35,8 +35,7 @@ namespace qpid {
 
         class MessageBuilder : public framing::FrameHandler{
         public:
-            QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store,
-                                              uint64_t stagingThreshold);
+            QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
             QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);
             boost::intrusive_ptr<Message> getMessage() { return message; }
             QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
@@ -46,8 +45,6 @@ namespace qpid {
             State state;
             boost::intrusive_ptr<Message> message;
             MessageStore* const store;
-            const uint64_t stagingThreshold;
-            bool staging;
 
             void checkType(uint8_t expected, uint8_t actual);
         };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu Apr 15 15:57:46 2010
@@ -35,17 +35,16 @@ namespace qpid {
 namespace broker {
 
 RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
-                                         DtxManager& _dtxMgr, uint64_t _stagingThreshold) 
-    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+                                         DtxManager& _dtxMgr)
+    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
 
 RecoveryManagerImpl::~RecoveryManagerImpl() {}
 
 class RecoverableMessageImpl : public RecoverableMessage
 {
     intrusive_ptr<Message> msg;
-    const uint64_t stagingThreshold;
 public:
-    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); 
+    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
     ~RecoverableMessageImpl() {};
     void setPersistenceId(uint64_t id);
     void setRedelivered();
@@ -130,7 +129,7 @@ RecoverableMessage::shared_ptr RecoveryM
 {
     boost::intrusive_ptr<Message> message(new Message());
     message->decodeHeader(buffer);
-    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));    
+    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
 }
 
 RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
@@ -161,16 +160,16 @@ void RecoveryManagerImpl::recoveryComple
     exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
 }
 
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) 
+RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
 {
     if (!msg->isPersistent()) {
         msg->forcePersistent(); // set so that message will get dequeued from store.
     }
 }
 
-bool RecoverableMessageImpl::loadContent(uint64_t available)
+bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
 {
-    return !stagingThreshold || available < stagingThreshold;
+    return true;
 }
 
 void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Thu Apr 15 15:57:46 2010
@@ -36,10 +36,9 @@ namespace broker {
         ExchangeRegistry& exchanges;
         LinkRegistry& links;
         DtxManager& dtxMgr;
-        const uint64_t stagingThreshold;
     public:
         RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
-                            DtxManager& dtxMgr, uint64_t stagingThreshold);
+                            DtxManager& dtxMgr);
         ~RecoveryManagerImpl();
 
         RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Apr 15 15:57:46 2010
@@ -58,7 +58,7 @@ SessionState::SessionState(
       broker(b), handler(&h),
       semanticState(*this, *this),
       adapter(semanticState),
-      msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+      msgBuilder(&broker.getStore()),
       enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
       mgmtObject(0),
       rateFlowcontrol(0)

Modified: qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Apr 15 15:57:46 2010
@@ -97,7 +97,7 @@ QPID_AUTO_TEST_SUITE(MessageBuilderTestS
 
 QPID_AUTO_TEST_CASE(testHeaderOnly)
 {
-    MessageBuilder builder(0, 0);
+    MessageBuilder builder(0);
     builder.start(SequenceNumber());
 
     std::string exchange("builder-exchange");
@@ -120,7 +120,7 @@ QPID_AUTO_TEST_CASE(testHeaderOnly)
 
 QPID_AUTO_TEST_CASE(test1ContentFrame)
 {
-    MessageBuilder builder(0, 0);
+    MessageBuilder builder(0);
     builder.start(SequenceNumber());
 
     std::string data("abcdefg");
@@ -153,7 +153,7 @@ QPID_AUTO_TEST_CASE(test1ContentFrame)
 
 QPID_AUTO_TEST_CASE(test2ContentFrames)
 {
-    MessageBuilder builder(0, 0);
+    MessageBuilder builder(0);
     builder.start(SequenceNumber());
 
     std::string data1("abcdefg");
@@ -185,67 +185,6 @@ QPID_AUTO_TEST_CASE(test2ContentFrames)
     BOOST_CHECK(builder.getMessage());
     BOOST_CHECK(builder.getMessage()->getFrames().isComplete());
 }
-
-QPID_AUTO_TEST_CASE(testStaging)
-{
-    MockMessageStore store;
-    MessageBuilder builder(&store, 5);
-    builder.start(SequenceNumber());
-
-    std::string data1("abcdefg");
-    std::string data2("hijklmn");
-    std::string exchange("builder-exchange");
-    std::string key("builder-exchange");
-
-    AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content1((AMQContentBody(data1)));
-    AMQFrame content2((AMQContentBody(data2)));
-
-    header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
-    header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
-    builder.handle(method);
-    builder.handle(header);
-
-    store.expectStage(*builder.getMessage());
-    builder.handle(content1);
-    BOOST_CHECK(store.expectationsMet());
-    BOOST_CHECK_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId());
-
-    store.expectAppendContent(*builder.getMessage(), data2);
-    builder.handle(content2);
-    BOOST_CHECK(store.expectationsMet());
-    //were the content frames dropped?
-    BOOST_CHECK(!builder.getMessage()->isContentLoaded());
-}
-
-QPID_AUTO_TEST_CASE(testNoManagementStaging)
-{
-    // Make sure management messages don't stage
-    MockMessageStore store;
-    MessageBuilder builder(&store, 5);
-    builder.start(SequenceNumber());
-
-    std::string data1("abcdefg");
-    std::string exchange("qpid.management");
-    std::string key("builder-exchange");
-
-    AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content1((AMQContentBody(data1)));
-
-    header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size());
-    header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
-    builder.handle(method);
-    builder.handle(header);
-
-    builder.handle(content1);
-    BOOST_CHECK(store.expectationsMet());
-    BOOST_CHECK_EQUAL((uint64_t) 0, builder.getMessage()->getPersistenceId());
-}
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org