You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2010/04/15 17:57:46 UTC
svn commit: r934463 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ tests/
Author: kpvdr
Date: Thu Apr 15 15:57:46 2010
New Revision: 934463
URL: http://svn.apache.org/viewvc?rev=934463&view=rev
Log:
Implementation of QPID-2509 (Remove message staging from C++ broker)
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Apr 15 15:57:46 2010
@@ -81,7 +81,6 @@ Broker::Options::Options(const std::stri
workerThreads(5),
maxConnections(500),
connectionBacklog(10),
- stagingThreshold(5000000),
enableMgmt(1),
mgmtPubInterval(10),
queueCleanInterval(60*10),//10 minutes
@@ -113,7 +112,6 @@ Broker::Options::Options(const std::stri
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
- ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
@@ -178,7 +176,6 @@ Broker::Broker(const Broker::Options& co
mgmtObject->set_workerThreads(conf.workerThreads);
mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
- mgmtObject->set_stagingThreshold(conf.stagingThreshold);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
@@ -223,8 +220,7 @@ Broker::Broker(const Broker::Options& co
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
- conf.stagingThreshold);
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
store->recover(recoverer);
}
else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Apr 15 15:57:46 2010
@@ -99,7 +99,6 @@ public:
int workerThreads;
int maxConnections;
int connectionBacklog;
- uint64_t stagingThreshold;
bool enableMgmt;
uint16_t mgmtPubInterval;
uint16_t queueCleanInterval;
@@ -205,7 +204,6 @@ public:
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
LinkRegistry& getLinks() { return links; }
- uint64_t getStagingThreshold() { return config.stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Apr 15 15:57:46 2010
@@ -46,7 +46,6 @@ class ConnectionState : public Connectio
framemax(65535),
heartbeat(0),
heartbeatmax(120),
- stagingThreshold(broker.getStagingThreshold()),
federationLink(true),
clientSupportsThrottling(false),
clusterOrderOut(0)
@@ -57,12 +56,10 @@ class ConnectionState : public Connectio
uint32_t getFrameMax() const { return framemax; }
uint16_t getHeartbeat() const { return heartbeat; }
uint16_t getHeartbeatMax() const { return heartbeatmax; }
- uint64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
virtual void setUserId(const string& uid) { userId = uid; }
const string& getUserId() const { return userId; }
@@ -107,7 +104,6 @@ class ConnectionState : public Connectio
uint32_t framemax;
uint16_t heartbeat;
uint16_t heartbeatmax;
- uint64_t stagingThreshold;
string userId;
string url;
bool federationLink;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Apr 15 15:57:46 2010
@@ -36,8 +36,8 @@ namespace
const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
- state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
+MessageBuilder::MessageBuilder(MessageStore* const _store) :
+ state(DORMANT), store(_store) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -68,29 +68,13 @@ void MessageBuilder::handle(AMQFrame& fr
default:
throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
}
- if (staging) {
- intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message);
- store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData());
- } else {
- message->getFrames().append(frame);
- //have we reached the staging limit? if so stage message and release content
- if (state == CONTENT
- && stagingThreshold
- && message->getFrames().getContentSize() >= stagingThreshold
- && !NullMessageStore::isNullStore(store)
- && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
- {
- message->releaseContent();
- staging = true;
- }
- }
+ message->getFrames().append(frame);
}
void MessageBuilder::end()
{
message = 0;
state = DORMANT;
- staging = false;
}
void MessageBuilder::start(const SequenceNumber& id)
@@ -98,7 +82,6 @@ void MessageBuilder::start(const Sequenc
message = intrusive_ptr<Message>(new Message(id));
message->setStore(store);
state = METHOD;
- staging = false;
}
namespace {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Thu Apr 15 15:57:46 2010
@@ -35,8 +35,7 @@ namespace qpid {
class MessageBuilder : public framing::FrameHandler{
public:
- QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store,
- uint64_t stagingThreshold);
+ QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);
boost::intrusive_ptr<Message> getMessage() { return message; }
QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
@@ -46,8 +45,6 @@ namespace qpid {
State state;
boost::intrusive_ptr<Message> message;
MessageStore* const store;
- const uint64_t stagingThreshold;
- bool staging;
void checkType(uint8_t expected, uint8_t actual);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu Apr 15 15:57:46 2010
@@ -35,17 +35,16 @@ namespace qpid {
namespace broker {
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+ DtxManager& _dtxMgr)
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
class RecoverableMessageImpl : public RecoverableMessage
{
intrusive_ptr<Message> msg;
- const uint64_t stagingThreshold;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold);
+ RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
@@ -130,7 +129,7 @@ RecoverableMessage::shared_ptr RecoveryM
{
boost::intrusive_ptr<Message> message(new Message());
message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
@@ -161,16 +160,16 @@ void RecoveryManagerImpl::recoveryComple
exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
+RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
{
if (!msg->isPersistent()) {
msg->forcePersistent(); // set so that message will get dequeued from store.
}
}
-bool RecoverableMessageImpl::loadContent(uint64_t available)
+bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
{
- return !stagingThreshold || available < stagingThreshold;
+ return true;
}
void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Thu Apr 15 15:57:46 2010
@@ -36,10 +36,9 @@ namespace broker {
ExchangeRegistry& exchanges;
LinkRegistry& links;
DtxManager& dtxMgr;
- const uint64_t stagingThreshold;
public:
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
- DtxManager& dtxMgr, uint64_t stagingThreshold);
+ DtxManager& dtxMgr);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Apr 15 15:57:46 2010
@@ -58,7 +58,7 @@ SessionState::SessionState(
broker(b), handler(&h),
semanticState(*this, *this),
adapter(semanticState),
- msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+ msgBuilder(&broker.getStore()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
rateFlowcontrol(0)
Modified: qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=934463&r1=934462&r2=934463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Apr 15 15:57:46 2010
@@ -97,7 +97,7 @@ QPID_AUTO_TEST_SUITE(MessageBuilderTestS
QPID_AUTO_TEST_CASE(testHeaderOnly)
{
- MessageBuilder builder(0, 0);
+ MessageBuilder builder(0);
builder.start(SequenceNumber());
std::string exchange("builder-exchange");
@@ -120,7 +120,7 @@ QPID_AUTO_TEST_CASE(testHeaderOnly)
QPID_AUTO_TEST_CASE(test1ContentFrame)
{
- MessageBuilder builder(0, 0);
+ MessageBuilder builder(0);
builder.start(SequenceNumber());
std::string data("abcdefg");
@@ -153,7 +153,7 @@ QPID_AUTO_TEST_CASE(test1ContentFrame)
QPID_AUTO_TEST_CASE(test2ContentFrames)
{
- MessageBuilder builder(0, 0);
+ MessageBuilder builder(0);
builder.start(SequenceNumber());
std::string data1("abcdefg");
@@ -185,67 +185,6 @@ QPID_AUTO_TEST_CASE(test2ContentFrames)
BOOST_CHECK(builder.getMessage());
BOOST_CHECK(builder.getMessage()->getFrames().isComplete());
}
-
-QPID_AUTO_TEST_CASE(testStaging)
-{
- MockMessageStore store;
- MessageBuilder builder(&store, 5);
- builder.start(SequenceNumber());
-
- std::string data1("abcdefg");
- std::string data2("hijklmn");
- std::string exchange("builder-exchange");
- std::string key("builder-exchange");
-
- AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content1((AMQContentBody(data1)));
- AMQFrame content2((AMQContentBody(data2)));
-
- header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
- header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
- builder.handle(method);
- builder.handle(header);
-
- store.expectStage(*builder.getMessage());
- builder.handle(content1);
- BOOST_CHECK(store.expectationsMet());
- BOOST_CHECK_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId());
-
- store.expectAppendContent(*builder.getMessage(), data2);
- builder.handle(content2);
- BOOST_CHECK(store.expectationsMet());
- //were the content frames dropped?
- BOOST_CHECK(!builder.getMessage()->isContentLoaded());
-}
-
-QPID_AUTO_TEST_CASE(testNoManagementStaging)
-{
- // Make sure management messages don't stage
- MockMessageStore store;
- MessageBuilder builder(&store, 5);
- builder.start(SequenceNumber());
-
- std::string data1("abcdefg");
- std::string exchange("qpid.management");
- std::string key("builder-exchange");
-
- AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content1((AMQContentBody(data1)));
-
- header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size());
- header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
- builder.handle(method);
- builder.handle(header);
-
- builder.handle(content1);
- BOOST_CHECK(store.expectationsMet());
- BOOST_CHECK_EQUAL((uint64_t) 0, builder.getMessage()->getPersistenceId());
-}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org