You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/15 19:28:32 UTC
svn commit: r496425 - in /incubator/qpid/branches/qpid.0-9/cpp: docs/api/
lib/broker/ lib/common/framing/ tests/
Author: aconway
Date: Mon Jan 15 10:28:29 2007
New Revision: 496425
URL: http://svn.apache.org/viewvc?view=rev&rev=496425
Log:
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker.
Modified:
incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
Modified: incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen Mon Jan 15 10:28:29 2007
@@ -300,7 +300,7 @@
# in case and if your file system supports case sensitive file names. Windows
# and Mac users are advised to set this option to NO.
-CASE_SENSE_NAMES = YES
+CASE_SENSE_NAMES = NO
# If the HIDE_SCOPE_NAMES tag is set to NO (the default) then Doxygen
# will show members with their full class and namespace scopes in the
@@ -560,7 +560,7 @@
# Note: To get rid of all source code in the generated output, make sure also
# VERBATIM_HEADERS is set to NO.
-SOURCE_BROWSER = NO
+SOURCE_BROWSER = YES
# Setting the INLINE_SOURCES tag to YES will include the body
# of functions and classes directly in the documentation.
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Mon Jan 15 10:28:29 2007
@@ -20,19 +20,61 @@
*/
#include <iostream>
#include <memory>
-#include <Broker.h>
-
-using namespace qpid::broker;
-using namespace qpid::sys;
+#include "AMQFrame.h"
+#include "DirectExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "ProtocolInitiation.h"
+#include "SessionHandlerImpl.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/TimeoutHandler.h"
+
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
Broker::Broker(const Configuration& config) :
acceptor(Acceptor::create(config.getPort(),
config.getConnectionBacklog(),
config.getWorkerThreads(),
config.isTrace())),
- factory(config.getStore())
-{ }
+ queues(store.get()),
+ timeout(30000),
+ stagingThreshold(0),
+ cleaner(&queues, timeout/10),
+ factory(*this)
+{
+ if (config.getStore().empty())
+ store.reset(new NullMessageStore());
+ else
+ store.reset(new MessageStoreModule(config.getStore()));
+
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ MessageStoreSettings storeSettings = { getStagingThreshold() };
+ store->recover(recoverer, &storeSettings);
+ }
+
+ cleaner.start();
+}
Broker::shared_ptr Broker::create(int16_t port)
@@ -57,3 +99,6 @@
Broker::~Broker() { }
const int16_t Broker::DEFAULT_PORT(5672);
+
+
+}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Mon Jan 15 10:28:29 2007
@@ -27,6 +27,8 @@
#include <sys/Runnable.h>
#include <sys/Acceptor.h>
#include <SharedObject.h>
+#include <MessageStore.h>
+#include <AutoDelete.h>
namespace qpid {
namespace broker {
@@ -69,13 +71,27 @@
/** Shut down the broker */
virtual void shutdown();
+ MessageStore& getStore() { return *store; }
+ QueueRegistry& getQueues() { return queues; }
+ ExchangeRegistry& getExchanges() { return exchanges; }
+ u_int32_t getTimeout() { return timeout; }
+ u_int64_t getStagingThreshold() { return stagingThreshold; }
+ AutoDelete& getCleaner() { return cleaner; }
+
private:
Broker(const Configuration& config);
+
qpid::sys::Acceptor::shared_ptr acceptor;
+ std::auto_ptr<MessageStore> store;
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ u_int32_t timeout;
+ u_int64_t stagingThreshold;
+ AutoDelete cleaner;
SessionHandlerFactoryImpl factory;
};
-}
-}
+
+}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Mon Jan 15 10:28:29 2007
@@ -19,51 +19,25 @@
*
*/
#include <SessionHandlerFactoryImpl.h>
-
-#include <DirectExchange.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <MessageStoreModule.h>
-#include <NullMessageStore.h>
#include <SessionHandlerImpl.h>
-using namespace qpid::broker;
-using namespace qpid::sys;
+namespace qpid {
+namespace broker {
-namespace
-{
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) :
- store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
- queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
-{
- exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- exchanges.declare(amq_direct, DirectExchange::typeName);
- exchanges.declare(amq_topic, TopicExchange::typeName);
- exchanges.declare(amq_fanout, FanOutExchange::typeName);
- exchanges.declare(amq_match, HeadersExchange::typeName);
-
- if(store.get()) {
- RecoveryManager recoverer(queues, exchanges);
- MessageStoreSettings storeSettings = { settings.stagingThreshold };
- store->recover(recoverer, &storeSettings);
- }
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b)
+{}
- cleaner.start();
-}
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
{
- return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
+ broker.getCleaner().stop();
}
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+qpid::sys::SessionHandler*
+SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt)
{
- cleaner.stop();
+ return new SessionHandlerImpl(ctxt, broker);
}
+
+}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h Mon Jan 15 10:28:29 2007
@@ -21,37 +21,26 @@
#ifndef _SessionHandlerFactoryImpl_
#define _SessionHandlerFactoryImpl_
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <MessageStore.h>
-#include <QueueRegistry.h>
-#include <AMQFrame.h>
-#include <ProtocolInitiation.h>
-#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
-#include <sys/SessionHandlerFactory.h>
-#include <sys/TimeoutHandler.h>
-#include <SessionHandlerImpl.h>
-#include <memory>
+#include "SessionHandlerFactory.h"
namespace qpid {
- namespace broker {
+namespace broker {
+class Broker;
- class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
- {
- std::auto_ptr<MessageStore> store;
- QueueRegistry queues;
- ExchangeRegistry exchanges;
- const Settings settings;
- AutoDelete cleaner;
- public:
- SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
- virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
- virtual ~SessionHandlerFactoryImpl();
- };
+class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory
+{
+ public:
+ SessionHandlerFactoryImpl(Broker& b);
+
+ virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+
+ virtual ~SessionHandlerFactoryImpl();
- }
-}
+ private:
+ Broker& broker;
+};
+
+}}
#endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp Mon Jan 15 10:28:29 2007
@@ -26,21 +26,22 @@
#include "assert.h"
using namespace boost;
-using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
- QueueRegistry* _queues,
- ExchangeRegistry* _exchanges,
- AutoDelete* _cleaner,
- const Settings& _settings) :
+namespace qpid {
+namespace broker {
+
+SessionHandlerImpl::SessionHandlerImpl(
+ SessionContext* _context, Broker& broker) :
+
context(_context),
- queues(_queues),
- exchanges(_exchanges),
- cleaner(_cleaner),
- settings(_settings),
+ client(0),
+ queues(broker.getQueues()),
+ exchanges(broker.getExchanges()),
+ cleaner(broker.getCleaner()),
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -49,10 +50,8 @@
txHandler(new TxHandlerImpl(this)),
messageHandler(new MessageHandlerImpl(this)),
framemax(65536),
- heartbeat(0){
-
- client =NULL;
-}
+ heartbeat(0)
+{}
SessionHandlerImpl::~SessionHandlerImpl(){
@@ -75,7 +74,7 @@
queue = getChannel(channel)->getDefaultQueue();
if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
} else {
- queue = queues->find(name);
+ queue = queues.find(name);
if (queue == 0) {
throw ChannelException( 404, "Queue not found: " + name);
}
@@ -85,7 +84,7 @@
Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
- return exchanges->get(name);
+ return exchanges.get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -96,8 +95,10 @@
switch(body->type())
{
case REQUEST_BODY:
+ // responder.received(frame);
case RESPONSE_BODY:
- case METHOD_BODY:
+ // requester.received(frame);
+ case METHOD_BODY: //
method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
try{
method->invoke(*this, channel);
@@ -164,7 +165,7 @@
}
for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
string name = (*i)->getName();
- queues->destroy(name);
+ queues.destroy(name);
exclusiveQueues.erase(i);
}
} catch(std::exception& e) {
@@ -221,7 +222,7 @@
parent->channels[channel] = new Channel(
parent->client->getProtocolVersion() , parent->context, channel,
- parent->framemax, parent->queues->getStore(),
+ parent->framemax, parent->queues.getStore(),
parent->settings.stagingThreshold);
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
@@ -251,12 +252,12 @@
const FieldTable& /*arguments*/){
if(passive){
- if(!parent->exchanges->get(exchange)){
+ if(!parent->exchanges.get(exchange)){
throw ChannelException(404, "Exchange not found: " + exchange);
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
if(!response.second && response.first->getType() != type){
throw ConnectionException(507, "Exchange already declared to be of type "
+ response.first->getType() + ", requested " + type);
@@ -288,7 +289,7 @@
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->destroy(exchange);
+ parent->exchanges.destroy(exchange);
if(!nowait) parent->client->getExchange().deleteOk(channel);
}
@@ -300,7 +301,7 @@
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+ parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -310,11 +311,11 @@
queue_created.first->create(arguments);
//add default binding:
- parent->exchanges->getDefault()->bind(queue, name, 0);
+ parent->exchanges.getDefault()->bind(queue, name, 0);
if (exclusive) {
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
- parent->cleaner->add(queue);
+ parent->cleaner.add(queue);
}
}
}
@@ -332,7 +333,7 @@
const FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -369,7 +370,7 @@
}
count = q->getMessageCount();
q->destroy();
- parent->queues->destroy(queue);
+ parent->queues.destroy(queue);
}
if(!nowait) parent->client->getQueue().deleteOk(channel, count);
@@ -424,7 +425,7 @@
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
- Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);
@@ -652,3 +653,4 @@
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
+}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h Mon Jan 15 10:28:29 2007
@@ -40,6 +40,7 @@
#include <sys/SessionHandler.h>
#include <sys/TimeoutHandler.h>
#include <TopicExchange.h>
+#include "Broker.h"
namespace qpid {
namespace broker {
@@ -77,11 +78,10 @@
qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy* client;
- QueueRegistry* queues;
- ExchangeRegistry* const exchanges;
- AutoDelete* const cleaner;
- const Settings settings;
-
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ AutoDelete& cleaner;
+ Settings settings;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -112,8 +112,7 @@
Exchange::shared_ptr findExchange(const string& name);
public:
- SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
- ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
+ SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
virtual void idleOut();
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp Mon Jan 15 10:28:29 2007
@@ -30,9 +30,9 @@
responseMark = request.responseMark;
}
-void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) {
+void Responder::sending(AMQResponseBody::Data& response) {
response.responseId = ++lastId;
- response.requestId = toRequest;
+ // response.requestId should have been set by caller.
response.batchOffset = 0;
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h Mon Jan 15 10:28:29 2007
@@ -40,7 +40,7 @@
void received(const AMQRequestBody::Data& request);
/** Called before sending a response to set respose data. */
- void sending(AMQResponseBody::Data& response, RequestId toRequest);
+ void sending(AMQResponseBody::Data& response);
/** Get the ID of the highest response acknowledged by the peer. */
ResponseId getResponseMark() { return responseMark; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp Mon Jan 15 10:28:29 2007
@@ -236,7 +236,8 @@
q.requestId = 1;
q.responseMark = 0;
r.received(q);
- r.sending(p, q.requestId);
+ p.requestId = q.requestId;
+ r.sending(p);
CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId);
CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId);
CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
@@ -245,9 +246,8 @@
q.requestId++;
q.responseMark = 1;
r.received(q);
- r.sending(p, q.requestId);
+ r.sending(p);
CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId);
- CPPUNIT_ASSERT_EQUAL(2ULL, p.requestId);
CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark());