You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/05/12 19:04:08 UTC
svn commit: r655563 [1/3] - in /incubator/qpid/trunk/qpid:
cpp/managementgen/templates/ cpp/src/ cpp/src/qpid/amqp_0_10/
cpp/src/qpid/broker/ cpp/src/qpid/management/ cpp/src/qpid/sys/
cpp/src/qpid/sys/posix/ cpp/src/tests/ python/commands/ python/qpid...
Author: gsim
Date: Mon May 12 10:04:07 2008
New Revision: 655563
URL: http://svn.apache.org/viewvc?rev=655563&view=rev
Log:
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolAccess.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
incubator/qpid/trunk/qpid/python/commands/qpid-config
incubator/qpid/trunk/qpid/python/commands/qpid-route
incubator/qpid/trunk/qpid/python/qpid/management.py
incubator/qpid/trunk/qpid/python/qpid/managementdata.py
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
namespace qpid {
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon May 12 10:04:07 2008
@@ -258,6 +258,8 @@
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
qpid/broker/IncompleteMessageList.cpp \
+ qpid/broker/Link.cpp \
+ qpid/broker/LinkRegistry.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -291,7 +293,7 @@
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
qpid/management/Manageable.cpp \
- qpid/management/ManagementAgent.cpp \
+ qpid/management/ManagementBroker.cpp \
qpid/management/ManagementExchange.cpp \
qpid/management/ManagementObject.cpp \
qpid/sys/TCPIOPlugin.cpp
@@ -382,6 +384,8 @@
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
qpid/broker/IncompleteMessageList.h \
+ qpid/broker/Link.h \
+ qpid/broker/LinkRegistry.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
@@ -391,6 +395,7 @@
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.h \
qpid/broker/Persistable.h \
+ qpid/broker/PersistableConfig.h \
qpid/broker/PersistableExchange.h \
qpid/broker/PersistableMessage.h \
qpid/broker/PersistableQueue.h \
@@ -398,6 +403,7 @@
qpid/broker/QueueBindings.h \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.h \
+ qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
qpid/broker/RecoverableMessage.h \
qpid/broker/RecoverableQueue.h \
@@ -506,6 +512,7 @@
qpid/management/Args.h \
qpid/management/Manageable.h \
qpid/management/ManagementAgent.h \
+ qpid/management/ManagementBroker.h \
qpid/management/ManagementExchange.h \
qpid/management/ManagementObject.h \
qpid/sys/AggregateOutput.h \
@@ -527,6 +534,7 @@
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
+ qpid/sys/ProtocolAccess.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Mon May 12 10:04:07 2008
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "qpid/sys/ProtocolAccess.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
@@ -27,9 +28,13 @@
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
- identifier(id), initialized(false), isClient(_isClient) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a)
+ : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)),
+ identifier(id), initialized(false), isClient(_isClient)
+{
+ if (a != 0)
+ a->callConnCb(connection);
+}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -45,13 +50,13 @@
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
+ connection->received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection.doOutput();
+ if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -90,7 +95,7 @@
}
void Connection::closed() {
- connection.closed();
+ connection->closed();
}
void Connection::send(framing::AMQFrame& f) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Mon May 12 10:04:07 2008
@@ -29,6 +29,7 @@
#include <queue>
namespace qpid {
+namespace sys { class ProtocolAccess; }
namespace broker { class Broker; }
namespace amqp_0_10 {
@@ -40,13 +41,13 @@
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection connection; // FIXME aconway 2008-03-18:
+ broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
bool isClient;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Mon May 12 10:04:07 2008
@@ -31,10 +31,12 @@
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
- args(_args), channel(id, &(c.getOutput())), peer(channel),
- mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l), name(Uuid(true).str())
+Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+ const management::ArgsLinkBridge& _args) :
+ id(_id), args(_args),
+ mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
+ args.i_key, args.i_src_is_queue, args.i_src_is_local)),
+ listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -44,18 +46,21 @@
mgmtObject->resourceDestroy();
}
-void Bridge::create()
+void Bridge::create(ConnectionState& c)
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.attach(name, false);
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
@@ -66,22 +71,22 @@
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
+
bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
-
}
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().detach(name);
+ peer->getMessage().cancel(args.i_dest);
+ peer->getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
@@ -94,8 +99,6 @@
if (methodId == management::Bridge::METHOD_CLOSE) {
//notify that we are closed
listener(this);
- //request time on the connections io thread
- connection.getOutput().activateOutput();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Mon May 12 10:04:07 2008
@@ -28,33 +28,36 @@
#include "qpid/management/Bridge.h"
#include <boost/function.hpp>
+#include <memory>
namespace qpid {
namespace broker {
class ConnectionState;
+class Link;
class Bridge : public management::Manageable
{
public:
typedef boost::function<void(Bridge*)> CancellationListener;
- Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
- const management::ArgsLinkBridge& args);
+ Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args);
~Bridge();
- void create();
+ void create(ConnectionState& c);
void cancel();
management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args);
private:
- management::ArgsLinkBridge args;
- framing::ChannelHandler channel;
- framing::AMQP_ServerProxy peer;
- management::Bridge::shared_ptr mgmtObject;
- ConnectionState& connection;
+ std::auto_ptr<framing::ChannelHandler> channelHandler;
+ std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
+ std::auto_ptr<framing::AMQP_ServerProxy> peer;
+
+ framing::ChannelId id;
+ management::ArgsLinkBridge args;
+ management::Bridge::shared_ptr mgmtObject;
CancellationListener listener;
std::string name;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon May 12 10:04:07 2008
@@ -28,6 +28,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "Link.h"
#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -60,7 +61,7 @@
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
-using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -129,15 +130,16 @@
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ links(this),
factory(*this),
sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
- ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval);
- managementAgent = ManagementAgent::getAgent ();
- managementAgent->setInterval (conf.mgmtPubInterval);
+ ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval, this);
+ managementAgent = management::ManagementAgent::getAgent ();
+ ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
@@ -163,6 +165,7 @@
queues.setParent (vhost);
exchanges.setParent (vhost);
+ links.setParent (vhost);
}
// Early-Initialize plugins
@@ -178,11 +181,12 @@
queues.setStore (store);
dtxManager.setStore (store);
+ links.setStore (store);
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
@@ -197,8 +201,9 @@
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
- managementAgent->setExchange (mExchange, dExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
+ ((ManagementBroker*) managementAgent.get());
}
else
QPID_LOG(info, "Management not enabled");
@@ -285,7 +290,7 @@
Broker::~Broker() {
shutdown();
- ManagementAgent::shutdown ();
+ ManagementBroker::shutdown ();
delete store;
if (config.auth) {
#if HAVE_SASL
@@ -319,7 +324,15 @@
case management::Broker::METHOD_CONNECT : {
management::ArgsBrokerConnect& hp=
dynamic_cast<management::ArgsBrokerConnect&>(args);
- connect(hp.i_host, hp.i_port);
+
+ if (hp.i_useSsl)
+ return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
+
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+ if (hp.i_durable && response.second)
+ store->create(*response.first);
+
status = Manageable::STATUS_OK;
break;
}
@@ -355,10 +368,11 @@
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
- const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* f)
+ const std::string& host, uint16_t port, bool /*useSsl*/,
+ sys::ConnectionCodec::Factory* f,
+ sys::ProtocolAccess* access)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
}
void Broker::connect(
@@ -366,7 +380,7 @@
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon May 12 10:04:07 2008
@@ -29,11 +29,12 @@
#include "ExchangeRegistry.h"
#include "MessageStore.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "SessionManager.h"
#include "Vhost.h"
#include "System.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/management/Broker.h"
#include "qpid/management/ArgsBrokerConnect.h"
#include "qpid/Options.h"
@@ -43,6 +44,7 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/ProtocolAccess.h"
#include <vector>
@@ -111,6 +113,7 @@
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
+ LinkRegistry& getLinks() { return links; }
uint64_t getStagingThreshold() { return config.stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
@@ -130,11 +133,16 @@
void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* =0);
+ void connect(const std::string& host, uint16_t port, bool useSsl,
+ sys::ConnectionCodec::Factory* =0,
+ sys::ProtocolAccess* =0);
/** Create a connection to another broker. */
void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+ // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
+ // For the present just return the first ProtocolFactory registered.
+ boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
+
private:
boost::shared_ptr<sys::Poller> poller;
Options config;
@@ -144,6 +152,7 @@
QueueRegistry queues;
ExchangeRegistry exchanges;
+ LinkRegistry links;
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
@@ -152,10 +161,6 @@
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
- // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
- // For the present just return the first ProtocolFactory registered.
- boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
-
void declareStandardExchange(const std::string& name, const std::string& type);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Mon May 12 10:04:07 2008
@@ -52,37 +52,14 @@
management::Client::shared_ptr mgmtClient;
public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming);
~MgmtClient();
void received(framing::AMQFrame& frame);
management::ManagementObject::shared_ptr getManagementObject() const;
void closing();
};
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(Connection& connection, const management::Args& args);
-};
-
-
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
@@ -103,14 +80,21 @@
if (agent.get () != 0)
{
if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
} else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
}
}
}
}
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+ ioCallback = callback;
+ out->activateOutput();
+}
+
+
Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@
bool Connection::doOutput()
{
try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (ioCallback)
+ ioCallback(); // Lend the IO thread for management processing
+ ioCallback = 0;
if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
@@ -192,8 +177,7 @@
return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -207,93 +191,17 @@
out->activateOutput();
status = Manageable::STATUS_OK;
break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
}
return status;
}
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+ ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming)
{
mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
+ (new management::Client (conn, parent, mgmtId, incoming));
agent->addObject (mgmtClient);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon May 12 10:04:07 2008
@@ -54,6 +54,7 @@
public ConnectionState
{
public:
+ typedef boost::shared_ptr<Connection> shared_ptr;
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -78,6 +79,7 @@
ManagementMethod (uint32_t methodId, management::Args& args);
void initMgmt(bool asLink = false);
+ void requestIOProcessing (boost::function0<void>);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +102,6 @@
virtual void process(Connection&, const management::Args&){}
};
class MgmtClient;
- class MgmtLink;
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
@@ -108,6 +109,7 @@
std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
const std::string mgmtId;
+ boost::function0<void> ioCallback;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Mon May 12 10:04:07 2008
@@ -39,9 +39,9 @@
}
sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true);
+ return new amqp_0_10::Connection(out, broker, id, true, a);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
+namespace sys { class ProtocolAccess; }
namespace broker {
class Broker;
@@ -37,7 +38,7 @@
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id);
+ create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
private:
Broker& broker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon May 12 10:04:07 2008
@@ -35,8 +35,9 @@
namespace
{
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
+const std::string ANONYMOUS = "ANONYMOUS";
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
@@ -135,10 +136,8 @@
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
- string uid = "qpidd";
- string pwd = "qpidd";
- string response = ((char)0) + uid + ((char)0) + pwd;
- server.startOk(FieldTable(), PLAIN, response, en_US);
+ string response;
+ server.startOk(FieldTable(), ANONYMOUS, response, en_US);
connection.initMgmt(true);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon May 12 10:04:07 2008
@@ -40,7 +40,7 @@
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
+ (new management::Exchange (this, parent, _name, durable));
agent->addObject (mgmtExchange);
}
}
@@ -56,8 +56,9 @@
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
- agent->addObject (mgmtExchange);
+ (new management::Exchange (this, parent, _name, durable));
+ if (!durable)
+ agent->addObject (mgmtExchange);
}
}
}
@@ -68,6 +69,16 @@
mgmtExchange->resourceDestroy ();
}
+void Exchange::setPersistenceId(uint64_t id) const
+{
+ if (mgmtExchange != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtExchange, id, 2);
+ }
+ persistenceId = id;
+}
+
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
string name;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon May 12 10:04:07 2008
@@ -90,7 +90,7 @@
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
//PersistableExchange:
- void setPersistenceId(uint64_t id) const { persistenceId = id; }
+ void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Mon May 12 10:04:07 2008
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Link.h"
+#include "LinkRegistry.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Link.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+Link::Link(LinkRegistry* _links,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ Broker* _broker,
+ management::Manageable* parent)
+ : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ persistenceId(0), broker(_broker), state(0),
+ access(boost::bind(&Link::established, this),
+ boost::bind(&Link::closed, this, _1, _2),
+ boost::bind(&Link::setConnection, this, _1)),
+ visitCount(0),
+ currentInterval(1),
+ closing(false),
+ channelCounter(1)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0)
+ {
+ mgmtObject = management::Link::shared_ptr
+ (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ if (!durable)
+ agent->addObject(mgmtObject);
+ }
+ }
+ setState(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+ if (state == STATE_OPERATIONAL)
+ access.close();
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Link::setState (int newState)
+{
+ if (newState == state)
+ return;
+
+ state = newState;
+ if (mgmtObject.get() == 0)
+ return;
+
+ switch (state)
+ {
+ case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
+ case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
+ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ }
+}
+
+void Link::startConnection ()
+{
+ try {
+ broker->connect (host, port, useSsl, 0, &access);
+ setState(STATE_CONNECTING);
+ } catch(std::exception& e) {
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (e.what());
+ }
+}
+
+void Link::established ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
+ setState(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+}
+
+void Link::closed (int, std::string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_OPERATIONAL)
+ QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+
+ connection.reset();
+ created.transfer(created.end(), active.begin(), active.end(), active);
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ if (closing)
+ destroy();
+}
+
+void Link::destroy ()
+{
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ connection.reset();
+ links->destroy (host, port);
+}
+
+void Link::cancel(Bridge* bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //need to take this out of the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == bridge) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+
+ if (connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::ioThreadProcessing()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create(*connection);
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Link::setConnection(Connection::shared_ptr c)
+{
+ connection = c;
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::maintenanceVisit ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_WAITING)
+ {
+ visitCount++;
+ if (visitCount >= currentInterval)
+ {
+ visitCount = 0;
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnection();
+ }
+ }
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject, id);
+ }
+ persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+ return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ bool useSsl(buffer.getOctet());
+ bool durable(buffer.getOctet());
+
+ return links.declare(host, port, useSsl, durable).first;
+}
+
+void Link::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("link"));
+ buffer.putShortString(host);
+ buffer.putShort(port);
+ buffer.putOctet(useSsl ? 1 : 0);
+ buffer.putOctet(durable ? 1 : 0);
+}
+
+uint32_t Link::encodedSize() const
+{
+ return host.size() + 1 // short-string (host)
+ + 5 // short-string ("link")
+ + 2 // port
+ + 1 // useSsl
+ + 1; // durable
+}
+
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
+{
+ return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ switch (op)
+ {
+ case management::Link::METHOD_CLOSE :
+ closing = true;
+ if (state != STATE_CONNECTING)
+ destroy();
+ return Manageable::STATUS_OK;
+
+ case management::Link::METHOD_BRIDGE :
+ management::ArgsLinkBridge iargs =
+ dynamic_cast<const management::ArgsLinkBridge&>(args);
+
+ // Durable bridges are only valid on durable links
+ if (iargs.i_durable && !durable)
+ return Manageable::STATUS_INVALID_PARAMETER;
+
+ created.push_back(new Bridge(this, channelCounter++,
+ boost::bind(&Link::cancel, this, _1), iargs));
+
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ return Manageable::STATUS_OK;
+ }
+
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Mon May 12 10:04:07 2008
@@ -0,0 +1,115 @@
+#ifndef _broker_Link_h
+#define _broker_Link_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "MessageStore.h"
+#include "PersistableConfig.h"
+#include "Bridge.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/ProtocolAccess.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Link.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+ namespace broker {
+
+ using std::string;
+ class LinkRegistry;
+ class Broker;
+ class Connection;
+
+ class Link : public PersistableConfig, public management::Manageable {
+ private:
+ sys::Mutex lock;
+ LinkRegistry* links;
+ const string host;
+ const uint16_t port;
+ const bool useSsl;
+ const bool durable;
+ mutable uint64_t persistenceId;
+ management::Link::shared_ptr mgmtObject;
+ Broker* broker;
+ int state;
+ sys::ProtocolAccess access;
+ uint32_t visitCount;
+ uint32_t currentInterval;
+ bool closing;
+
+ typedef boost::ptr_vector<Bridge> Bridges;
+ Bridges created; // Bridges pending creation
+ Bridges active; // Bridges active
+ Bridges cancelled; // Bridges pending deletion
+ uint channelCounter;
+ boost::shared_ptr<Connection> connection;
+
+ static const int STATE_WAITING = 1;
+ static const int STATE_CONNECTING = 2;
+ static const int STATE_OPERATIONAL = 3;
+
+ static const uint32_t MAX_INTERVAL = 16;
+
+ void setState (int newState);
+ void startConnection(); // Start the IO Connection
+ void established(); // Called when connection is created
+ void closed(int, std::string); // Called when connection goes away
+ void destroy(); // Called when mgmt deletes this link
+ void cancel(Bridge*); // Called by self-cancelling bridge
+ void ioThreadProcessing(); // Called on connection's IO thread by request
+ void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
+
+ public:
+ typedef boost::shared_ptr<Link> shared_ptr;
+
+ Link(LinkRegistry* links,
+ string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ Broker* broker,
+ management::Manageable* parent = 0);
+ virtual ~Link();
+
+ bool isDurable() { return durable; }
+ void maintenanceVisit ();
+
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const string& getName() const;
+
+ static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t ManagementMethod (uint32_t, management::Args&);
+ };
+ }
+}
+
+
+#endif /*!_broker_Link.cpp_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon May 12 10:04:07 2008
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LinkRegistry.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+using std::stringstream;
+using boost::intrusive_ptr;
+
+#define LINK_MAINT_INTERVAL 5
+
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+{
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+}
+
+LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
+ TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {}
+
+void LinkRegistry::Periodic::fire ()
+{
+ links.periodicMaintenance ();
+ links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+}
+
+void LinkRegistry::periodicMaintenance ()
+{
+ Mutex::ScopedLock locker(lock);
+ linksToDestroy.clear();
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
+ i->second->maintenanceVisit();
+}
+
+pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i == links.end())
+ {
+ Link::shared_ptr link;
+
+ link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+ links[key] = link;
+ return std::pair<Link::shared_ptr, bool>(link, true);
+ }
+ return std::pair<Link::shared_ptr, bool>(i->second, false);
+}
+
+void LinkRegistry::destroy(const string& host, const uint16_t port)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i != links.end())
+ {
+ if (i->second->isDurable() && store)
+ store->destroy(*(i->second));
+ linksToDestroy[key] = i->second;
+ links.erase(i);
+ }
+}
+
+void LinkRegistry::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ store = _store;
+}
+
+MessageStore* LinkRegistry::getStore() const {
+ return store;
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Mon May 12 10:04:07 2008
@@ -0,0 +1,87 @@
+#ifndef _broker_LinkRegistry_h
+#define _broker_LinkRegistry_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include "Link.h"
+#include "MessageStore.h"
+#include "Timer.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+
+ class Broker;
+ class LinkRegistry {
+
+ // Declare a timer task to manage the establishment of link connections and the
+ // re-establishment of lost link connections.
+ struct Periodic : public TimerTask
+ {
+ LinkRegistry& links;
+
+ Periodic(LinkRegistry& links);
+ virtual ~Periodic() {};
+ void fire();
+ };
+
+ typedef std::map<std::string, Link::shared_ptr> LinkMap;
+ LinkMap links;
+ LinkMap linksToDestroy;
+ qpid::sys::Mutex lock;
+ Broker* broker;
+ Timer timer;
+ management::Manageable* parent;
+ MessageStore* store;
+
+ void periodicMaintenance ();
+
+ public:
+ LinkRegistry (Broker* _broker);
+ std::pair<Link::shared_ptr, bool> declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable);
+ void destroy(const std::string& host, const uint16_t port);
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ /**
+ * Set the store to use. May only be called once.
+ */
+ void setStore (MessageStore*);
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* getStore() const;
+ };
+}
+}
+
+
+#endif /*!_broker_LinkRegistry_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
+#include "PersistableConfig.h"
#include "RecoveryManager.h"
#include "TransactionalStore.h"
#include "qpid/framing/FieldTable.h"
@@ -87,6 +88,16 @@
const std::string& key, const framing::FieldTable& args) = 0;
/**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config) = 0;
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config) = 0;
+
+ /**
* Stores a messages before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Mon May 12 10:04:07 2008
@@ -70,6 +70,16 @@
TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
}
+void MessageStoreModule::create(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->create(config));
+}
+
+void MessageStoreModule::destroy(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->destroy(config));
+}
+
void MessageStoreModule::recover(RecoveryManager& registry)
{
TRANSFER_EXCEPTION(store->recover(registry));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Mon May 12 10:04:07 2008
@@ -57,6 +57,8 @@
const std::string& key, const framing::FieldTable& args);
void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ void create(const PersistableConfig& config);
+ void destroy(const PersistableConfig& config);
void recover(RecoveryManager& queues);
void stage(boost::intrusive_ptr<PersistableMessage>& msg);
void destroy(PersistableMessage& msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Mon May 12 10:04:07 2008
@@ -49,7 +49,7 @@
using namespace qpid::broker;
-NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {}
bool NullMessageStore::init(const Options* /*options*/) {return true;}
@@ -57,6 +57,7 @@
{
QPID_LOG(info, "Queue '" << queue.getName()
<< "' will not be durable. Persistence not enabled.");
+ queue.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(PersistableQueue&)
@@ -67,6 +68,7 @@
{
QPID_LOG(info, "Exchange'" << exchange.getName()
<< "' will not be durable. Persistence not enabled.");
+ exchange.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(const PersistableExchange& )
@@ -76,6 +78,17 @@
void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+void NullMessageStore::create(const PersistableConfig& config)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+ config.setPersistenceId(nextPersistenceId++);
+}
+
+void NullMessageStore::destroy(const PersistableConfig&)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+}
+
void NullMessageStore::recover(RecoveryManager&)
{
QPID_LOG(info, "Persistence not enabled, no recovery attempted.");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Mon May 12 10:04:07 2008
@@ -37,6 +37,7 @@
{
std::set<std::string> prepared;
const bool warn;
+ uint64_t nextPersistenceId;
public:
NullMessageStore(bool warn = false);
@@ -57,6 +58,8 @@
const std::string& key, const framing::FieldTable& args);
virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ virtual void create(const PersistableConfig& config);
+ virtual void destroy(const PersistableConfig& config);
virtual void recover(RecoveryManager& queues);
virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg);
virtual void destroy(PersistableMessage& msg);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h Mon May 12 10:04:07 2008
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableConfig_h
+#define _broker_PersistableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface used by general-purpose persistable configuration for
+ * the message store.
+ */
+class PersistableConfig : public Persistable
+{
+public:
+ virtual const std::string& getName() const = 0;
+ virtual ~PersistableConfig() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon May 12 10:04:07 2008
@@ -586,7 +586,7 @@
if (mgmtObject != 0 && persistenceId == 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject, _persistenceId);
+ agent->addObject (mgmtObject, _persistenceId, 3);
}
persistenceId = _persistenceId;
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h Mon May 12 10:04:07 2008
@@ -0,0 +1,45 @@
+#ifndef _broker_RecoverableConfig_h
+#define _broker_RecoverableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which configurations are recovered.
+ */
+class RecoverableConfig
+{
+public:
+ typedef boost::shared_ptr<RecoverableConfig> shared_ptr;
+
+ virtual void setPersistenceId(uint64_t id) = 0;
+ virtual ~RecoverableConfig() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h Mon May 12 10:04:07 2008
@@ -25,6 +25,7 @@
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
#include "RecoverableTransaction.h"
+#include "RecoverableConfig.h"
#include "TransactionalStore.h"
#include "qpid/framing/Buffer.h"
@@ -39,6 +40,8 @@
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn) = 0;
+ virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0;
+
virtual void recoveryComplete() = 0;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon May 12 10:04:07 2008
@@ -22,6 +22,7 @@
#include "Message.h"
#include "Queue.h"
+#include "Link.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -34,9 +35,9 @@
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
DtxManager& _dtxMgr, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -82,6 +83,15 @@
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableConfigImpl : public RecoverableConfig
+{
+ // TODO: Add links for other config types, consider using super class (PersistableConfig?)
+ Link::shared_ptr link;
+public:
+ RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ void setPersistenceId(uint64_t id);
+};
+
class RecoverableTransactionImpl : public RecoverableTransaction
{
DtxBuffer::shared_ptr buffer;
@@ -125,6 +135,19 @@
return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
}
+RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
+{
+ string kind;
+
+ buffer.getShortString (kind);
+ if (kind == "link")
+ {
+ return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
+ }
+
+ return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -185,6 +208,13 @@
exchange->setPersistenceId(id);
}
+void RecoverableConfigImpl::setPersistenceId(uint64_t id)
+{
+ if (link.get())
+ link->setPersistenceId(id);
+ // TODO: add calls to other types. Consider using a parent class.
+}
+
void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
{
Queue::shared_ptr queue = queues.find(queueName);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Mon May 12 10:04:07 2008
@@ -25,6 +25,7 @@
#include "DtxManager.h"
#include "ExchangeRegistry.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "RecoveryManager.h"
namespace qpid {
@@ -33,10 +34,12 @@
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
+ LinkRegistry& links;
DtxManager& dtxMgr;
const uint64_t stagingThreshold;
public:
- RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
+ RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
+ DtxManager& dtxMgr, uint64_t stagingThreshold);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
@@ -44,6 +47,7 @@
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
+ RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer);
void recoveryComplete();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h Mon May 12 10:04:07 2008
@@ -42,9 +42,6 @@
management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
-
- management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
- { return management::Manageable::STATUS_OK; }
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp Mon May 12 10:04:07 2008
@@ -25,13 +25,19 @@
{
switch (status)
{
- case STATUS_OK : return "OK";
- case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
- case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
- case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
- case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_OK : return "OK";
+ case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
+ case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
+ case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
+ case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
}
return "??";
}
+Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&)
+{
+ return STATUS_UNKNOWN_METHOD;
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h Mon May 12 10:04:07 2008
@@ -39,11 +39,12 @@
typedef uint32_t status_t;
static std::string StatusText (status_t status);
- static const status_t STATUS_OK = 0;
- static const status_t STATUS_UNKNOWN_OBJECT = 1;
- static const status_t STATUS_UNKNOWN_METHOD = 2;
- static const status_t STATUS_NOT_IMPLEMENTED = 3;
- static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_OK = 0;
+ static const status_t STATUS_UNKNOWN_OBJECT = 1;
+ static const status_t STATUS_UNKNOWN_METHOD = 2;
+ static const status_t STATUS_NOT_IMPLEMENTED = 3;
+ static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
// Every "Manageable" object must hold a reference to exactly one
// management object. This object is always of a class derived from
@@ -58,7 +59,7 @@
// on this object. The input and output arguments are specific to the
// method being called and must be down-cast to the appropriate sub class
// before use.
- virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+ virtual status_t ManagementMethod (uint32_t methodId, Args& args);
};
inline Manageable::~Manageable (void) {}