You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/05/12 19:04:08 UTC

svn commit: r655563 [1/3] - in /incubator/qpid/trunk/qpid: cpp/managementgen/templates/ cpp/src/ cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/management/ cpp/src/qpid/sys/ cpp/src/qpid/sys/posix/ cpp/src/tests/ python/commands/ python/qpid...

Author: gsim
Date: Mon May 12 10:04:07 2008
New Revision: 655563

URL: http://svn.apache.org/viewvc?rev=655563&view=rev
Log:
QPID-1050: Patch from Ted Ross:

1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
    a) Links can be created even if the remote broker is not reachable
    b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable. 

(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolAccess.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/commands/qpid-config
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/managementdata.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
 /*MGEN:Root.Disclaimer*/
 
 #include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/Uuid.h"
 
 namespace qpid {

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon May 12 10:04:07 2008
@@ -258,6 +258,8 @@
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/IncomingExecutionContext.cpp \
   qpid/broker/IncompleteMessageList.cpp \
+  qpid/broker/Link.cpp \
+  qpid/broker/LinkRegistry.cpp \
   qpid/broker/Message.cpp \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
@@ -291,7 +293,7 @@
   qpid/broker/TxPublish.cpp \
   qpid/broker/Vhost.cpp \
   qpid/management/Manageable.cpp \
-  qpid/management/ManagementAgent.cpp \
+  qpid/management/ManagementBroker.cpp \
   qpid/management/ManagementExchange.cpp \
   qpid/management/ManagementObject.cpp \
   qpid/sys/TCPIOPlugin.cpp
@@ -382,6 +384,8 @@
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncomingExecutionContext.h \
   qpid/broker/IncompleteMessageList.h \
+  qpid/broker/Link.h \
+  qpid/broker/LinkRegistry.h \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.h \
@@ -391,6 +395,7 @@
   qpid/broker/NameGenerator.h \
   qpid/broker/NullMessageStore.h \
   qpid/broker/Persistable.h \
+  qpid/broker/PersistableConfig.h \
   qpid/broker/PersistableExchange.h \
   qpid/broker/PersistableMessage.h \
   qpid/broker/PersistableQueue.h \
@@ -398,6 +403,7 @@
   qpid/broker/QueueBindings.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.h \
+  qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \
   qpid/broker/RecoverableMessage.h \
   qpid/broker/RecoverableQueue.h \
@@ -506,6 +512,7 @@
   qpid/management/Args.h \
   qpid/management/Manageable.h \
   qpid/management/ManagementAgent.h \
+  qpid/management/ManagementBroker.h \
   qpid/management/ManagementExchange.h \
   qpid/management/ManagementObject.h \
   qpid/sys/AggregateOutput.h \
@@ -527,6 +534,7 @@
   qpid/sys/OutputControl.h \
   qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
+  qpid/sys/ProtocolAccess.h \
   qpid/sys/ProtocolFactory.h \
   qpid/sys/Runnable.h \
   qpid/sys/ScopedIncrement.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Mon May 12 10:04:07 2008
@@ -19,6 +19,7 @@
  *
  */
 #include "Connection.h"
+#include "qpid/sys/ProtocolAccess.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/exceptions.h"
 
@@ -27,9 +28,13 @@
 
 using sys::Mutex;
 
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
-        : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
-          identifier(id), initialized(false), isClient(_isClient) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a)
+    : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)),
+          identifier(id), initialized(false), isClient(_isClient)
+{
+    if (a != 0)
+        a->callConnCb(connection);
+}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
     framing::Buffer in(const_cast<char*>(buffer), size);
@@ -45,13 +50,13 @@
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        connection.received(frame);
+        connection->received(frame);
     }
     return in.getPosition();
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection.doOutput();
+    if (!frameQueueClosed) connection->doOutput();
     Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
@@ -90,7 +95,7 @@
 }
 
 void  Connection::closed() {
-    connection.closed();
+    connection->closed();
 }
 
 void Connection::send(framing::AMQFrame& f) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Mon May 12 10:04:07 2008
@@ -29,6 +29,7 @@
 #include <queue>
 
 namespace qpid {
+namespace sys { class ProtocolAccess; }
 namespace broker { class Broker; }
 namespace amqp_0_10 {
 
@@ -40,13 +41,13 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    broker::Connection connection; // FIXME aconway 2008-03-18: 
+    broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18: 
     std::string identifier;
     bool initialized;
     bool isClient;
     
   public:
-    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0);
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool isClosed() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Mon May 12 10:04:07 2008
@@ -31,10 +31,12 @@
 namespace qpid {
 namespace broker {
 
-Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : 
-    args(_args), channel(id, &(c.getOutput())), peer(channel), 
-    mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
-    connection(c), listener(l), name(Uuid(true).str())
+Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+               const management::ArgsLinkBridge& _args) : 
+    id(_id), args(_args),
+    mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
+                                      args.i_key, args.i_src_is_queue, args.i_src_is_local)),
+    listener(l), name(Uuid(true).str())
 {
     management::ManagementAgent::getAgent()->addObject(mgmtObject);
 }
@@ -44,18 +46,21 @@
     mgmtObject->resourceDestroy(); 
 }
 
-void Bridge::create()
+void Bridge::create(ConnectionState& c)
 {
-    framing::AMQP_ServerProxy::Session session(channel);
-    session.attach(name, false);
+    channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+    session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+    peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+    session->attach(name, false);
 
     if (args.i_src_is_local) {
         //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
     } else {
         if (args.i_src_is_queue) {
-            peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
-            peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+            peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+            peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+            peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         } else {
             string queue = "bridge_queue_";
             queue += Uuid(true).str();
@@ -66,22 +71,22 @@
             if (args.i_excludes.size()) {
                 queueSettings.setString("qpid.trace.exclude", args.i_excludes);
             }
+
             bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
             bool autoDelete = !durable;//auto delete transient queues?
-            peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
-            peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
-            peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
-            peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+            peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+            peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+            peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+            peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+            peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         }
     }
-
 }
 
 void Bridge::cancel()
 {
-    peer.getMessage().cancel(args.i_dest);    
-    peer.getSession().detach(name);
+    peer->getMessage().cancel(args.i_dest);
+    peer->getSession().detach(name);
 }
 
 management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
@@ -94,8 +99,6 @@
     if (methodId == management::Bridge::METHOD_CLOSE) {  
         //notify that we are closed
         listener(this);
-        //request time on the connections io thread
-        connection.getOutput().activateOutput();
         return management::Manageable::STATUS_OK;
     } else {
         return management::Manageable::STATUS_UNKNOWN_METHOD;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Mon May 12 10:04:07 2008
@@ -28,33 +28,36 @@
 #include "qpid/management/Bridge.h"
 
 #include <boost/function.hpp>
+#include <memory>
 
 namespace qpid {
 namespace broker {
 
 class ConnectionState;
+class Link;
 
 class Bridge : public management::Manageable
 {
 public:
     typedef boost::function<void(Bridge*)> CancellationListener;
 
-    Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
-           const management::ArgsLinkBridge& args);
+    Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args);
     ~Bridge();
 
-    void create();
+    void create(ConnectionState& c);
     void cancel();
 
     management::ManagementObject::shared_ptr GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args);
 
 private:
-    management::ArgsLinkBridge args;
-    framing::ChannelHandler channel;
-    framing::AMQP_ServerProxy peer;
-    management::Bridge::shared_ptr mgmtObject;
-    ConnectionState& connection;
+    std::auto_ptr<framing::ChannelHandler>            channelHandler;
+    std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
+    std::auto_ptr<framing::AMQP_ServerProxy>          peer;
+
+    framing::ChannelId                  id;
+    management::ArgsLinkBridge          args;
+    management::Bridge::shared_ptr      mgmtObject;
     CancellationListener listener;
     std::string name;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon May 12 10:04:07 2008
@@ -28,6 +28,7 @@
 #include "NullMessageStore.h"
 #include "RecoveryManagerImpl.h"
 #include "TopicExchange.h"
+#include "Link.h"
 #include "qpid/management/PackageQpid.h"
 #include "qpid/management/ManagementExchange.h"
 #include "qpid/management/ArgsBrokerEcho.h"
@@ -60,7 +61,7 @@
 using qpid::sys::Thread;
 using qpid::framing::FrameHandler;
 using qpid::framing::ChannelId;
-using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -129,15 +130,16 @@
     config(conf),
     store(0),
     dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+    links(this),
     factory(*this),
     sessionManager(conf.ack)
 {
     if(conf.enableMgmt){
         QPID_LOG(info, "Management enabled");
-        ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
-                                           conf.mgmtPubInterval);
-        managementAgent = ManagementAgent::getAgent ();
-        managementAgent->setInterval (conf.mgmtPubInterval);
+        ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+                                            conf.mgmtPubInterval, this);
+        managementAgent = management::ManagementAgent::getAgent ();
+        ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
         qpid::management::PackageQpid packageInitializer (managementAgent);
 
         System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
@@ -163,6 +165,7 @@
 
         queues.setParent    (vhost);
         exchanges.setParent (vhost);
+        links.setParent     (vhost);
     }
 
     // Early-Initialize plugins
@@ -178,11 +181,12 @@
 
     queues.setStore     (store);
     dtxManager.setStore (store);
+    links.setStore      (store);
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     
     if (store != 0) {
-        RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
+        RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, 
                                       conf.stagingThreshold);
         store->recover(recoverer);
     }
@@ -197,8 +201,9 @@
         exchanges.declare(qpid_management, ManagementExchange::typeName);
         Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
         Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
-        managementAgent->setExchange (mExchange, dExchange);
-        dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+        ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange);
+        dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
+            ((ManagementBroker*) managementAgent.get());
     }
     else
         QPID_LOG(info, "Management not enabled");
@@ -285,7 +290,7 @@
 
 Broker::~Broker() {
     shutdown();
-    ManagementAgent::shutdown ();
+    ManagementBroker::shutdown ();
     delete store;    
     if (config.auth) {
 #if HAVE_SASL
@@ -319,7 +324,15 @@
       case management::Broker::METHOD_CONNECT : {
         management::ArgsBrokerConnect& hp=
             dynamic_cast<management::ArgsBrokerConnect&>(args);
-        connect(hp.i_host, hp.i_port);
+
+        if (hp.i_useSsl)
+            return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
+
+        std::pair<Link::shared_ptr, bool> response =
+            links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+        if (hp.i_durable && response.second)
+            store->create(*response.first);
+
         status = Manageable::STATUS_OK;
         break;
       }
@@ -355,10 +368,11 @@
 
 // TODO: How to chose the protocolFactory to use for the connection
 void Broker::connect(
-    const std::string& host, uint16_t port,
-    sys::ConnectionCodec::Factory* f)
+    const std::string& host, uint16_t port, bool /*useSsl*/,
+    sys::ConnectionCodec::Factory* f,
+    sys::ProtocolAccess* access)
 {
-    getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
+    getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
 }
 
 void Broker::connect(
@@ -366,7 +380,7 @@
 {
     url.throwIfEmpty();
     TcpAddress addr=boost::get<TcpAddress>(url[0]);
-    connect(addr.host, addr.port, f);
+    connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon May 12 10:04:07 2008
@@ -29,11 +29,12 @@
 #include "ExchangeRegistry.h"
 #include "MessageStore.h"
 #include "QueueRegistry.h"
+#include "LinkRegistry.h"
 #include "SessionManager.h"
 #include "Vhost.h"
 #include "System.h"
 #include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/management/Broker.h"
 #include "qpid/management/ArgsBrokerConnect.h"
 #include "qpid/Options.h"
@@ -43,6 +44,7 @@
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
+#include "qpid/sys/ProtocolAccess.h"
 
 #include <vector>
 
@@ -111,6 +113,7 @@
     MessageStore& getStore() { return *store; }
     QueueRegistry& getQueues() { return queues; }
     ExchangeRegistry& getExchanges() { return exchanges; }
+    LinkRegistry& getLinks() { return links; }
     uint64_t getStagingThreshold() { return config.stagingThreshold; }
     DtxManager& getDtxManager() { return dtxManager; }
     DataDir& getDataDir() { return dataDir; }
@@ -130,11 +133,16 @@
     void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, uint16_t port,
-                 sys::ConnectionCodec::Factory* =0);
+    void connect(const std::string& host, uint16_t port, bool useSsl,
+                 sys::ConnectionCodec::Factory* =0,
+                 sys::ProtocolAccess* =0);
     /** Create a connection to another broker. */
     void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
 
+    // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
+    // For the present just return the first ProtocolFactory registered.
+    boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
+
   private:
     boost::shared_ptr<sys::Poller> poller;
     Options config;
@@ -144,6 +152,7 @@
 
     QueueRegistry queues;
     ExchangeRegistry exchanges;
+    LinkRegistry links;
     ConnectionFactory factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
@@ -152,10 +161,6 @@
     Vhost::shared_ptr              vhostObject;
     System::shared_ptr             systemObject;
 
-    // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
-    // For the present just return the first ProtocolFactory registered.
-    boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
-
     void declareStandardExchange(const std::string& name, const std::string& type);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Mon May 12 10:04:07 2008
@@ -52,37 +52,14 @@
     management::Client::shared_ptr mgmtClient;
 
 public:
-    MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+    MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+               const std::string& mgmtId, bool incoming);
     ~MgmtClient();
     void received(framing::AMQFrame& frame);
     management::ManagementObject::shared_ptr getManagementObject() const;
     void closing();
 };
 
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
-    typedef boost::ptr_vector<Bridge> Bridges;
-
-    management::Link::shared_ptr mgmtLink;
-    Bridges created;//holds list of bridges pending creation
-    Bridges cancelled;//holds list of bridges pending cancellation
-    Bridges active;//holds active bridges
-    uint channelCounter;
-    sys::Mutex linkLock;
-
-    void cancel(Bridge*);
-
-public:
-    MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
-    ~MgmtLink();
-    void received(framing::AMQFrame& frame);
-    management::ManagementObject::shared_ptr getManagementObject() const;
-    void closing();
-    void processPending();
-    void process(Connection& connection, const management::Args& args);
-};
-
-
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
     ConnectionState(out_, broker_),
     adapter(*this, isLink),
@@ -103,14 +80,21 @@
         if (agent.get () != 0)
         {
             if (asLink) {
-                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
             } else {
-                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
             }
         }
     }
 }
 
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+    ioCallback = callback;
+    out->activateOutput();
+}
+
+
 Connection::~Connection () {}
 
 void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@
 bool Connection::doOutput()
 {    
     try{
-        //process any pending mgmt commands:
-        if (mgmtWrapper.get()) mgmtWrapper->processPending();
+        if (ioCallback)
+            ioCallback(); // Lend the IO thread for management processing
+        ioCallback = 0;
         if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
 
         //then do other output as needed:
@@ -192,8 +177,7 @@
     return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
 }
 
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
-                                                   Args&    args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
@@ -207,93 +191,17 @@
         out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
-    case management::Link::METHOD_BRIDGE :
-        //queue this up and request chance to do output (i.e. get connections thread of control):
-        mgmtWrapper->process(*this, args);
-        out->activateOutput();
-        status = Manageable::STATUS_OK;
-        break;
     }
 
     return status;
 }
 
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) 
-    : channelCounter(1)
-{
-    mgmtLink = management::Link::shared_ptr
-        (new management::Link(conn, parent, mgmtId));
-    agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
-    if (mgmtLink.get () != 0)
-        mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
-    if (mgmtLink.get () != 0)
-    {
-        mgmtLink->inc_framesFromPeer ();
-        mgmtLink->inc_bytesFromPeer (frame.size ());
-    }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
-    return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
-    if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
-    Mutex::ScopedLock l(linkLock);
-    //process any pending creates
-    if (!created.empty()) {
-        for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
-            i->create();
-        }
-        active.transfer(active.end(), created.begin(), created.end(), created);
-    }
-    if (!cancelled.empty()) {
-        //process any pending cancellations
-        for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
-            i->cancel();
-        }
-        cancelled.clear();
-    }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{   
-    Mutex::ScopedLock l(linkLock);
-    created.push_back(new Bridge(channelCounter++, connection, 
-                                 boost::bind(&MgmtLink::cancel, this, _1),
-                                 dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{   
-    Mutex::ScopedLock l(linkLock);
-    //need to take this out the active map and add it to the cancelled map
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        if (&(*i) == b) {
-            cancelled.transfer(cancelled.end(), i, active);
-            break;
-        }
-    }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+                                   ManagementAgent::shared_ptr agent,
+                                   const std::string& mgmtId, bool incoming)
 {
     mgmtClient = management::Client::shared_ptr
-        (new management::Client (conn, parent, mgmtId));
+        (new management::Client (conn, parent, mgmtId, incoming));
     agent->addObject (mgmtClient);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon May 12 10:04:07 2008
@@ -54,6 +54,7 @@
                    public ConnectionState
 {
   public:
+    typedef boost::shared_ptr<Connection> shared_ptr;
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
     ~Connection ();
 
@@ -78,6 +79,7 @@
         ManagementMethod (uint32_t methodId, management::Args& args);
 
     void initMgmt(bool asLink = false);
+    void requestIOProcessing (boost::function0<void>);
 
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +102,6 @@
         virtual void process(Connection&, const management::Args&){}
     };
     class MgmtClient;
-    class MgmtLink;
 
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;
@@ -108,6 +109,7 @@
     std::auto_ptr<MgmtWrapper> mgmtWrapper;
     bool mgmtClosing;
     const std::string mgmtId;
+    boost::function0<void> ioCallback;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Mon May 12 10:04:07 2008
@@ -39,9 +39,9 @@
 }
 
 sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
     // used to create connections from one broker to another
-    return new amqp_0_10::Connection(out, broker, id, true);
+    return new amqp_0_10::Connection(out, broker, id, true, a);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
 #include "qpid/sys/ConnectionCodec.h"
 
 namespace qpid {
+namespace sys { class ProtocolAccess; }
 namespace broker {
 class Broker;
 
@@ -37,7 +38,7 @@
     create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
 
     sys::ConnectionCodec*
-    create(sys::OutputControl&, const std::string& id);
+    create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
 
   private:
     Broker& broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon May 12 10:04:07 2008
@@ -35,8 +35,9 @@
 
 namespace 
 {
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
+const std::string ANONYMOUS = "ANONYMOUS";
+const std::string PLAIN     = "PLAIN";
+const std::string en_US     = "en_US";
 }
 
 void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
@@ -135,10 +136,8 @@
                                        const framing::Array& /*mechanisms*/,
                                        const framing::Array& /*locales*/)
 {
-    string uid = "qpidd";
-    string pwd = "qpidd";
-    string response = ((char)0) + uid + ((char)0) + pwd;
-    server.startOk(FieldTable(), PLAIN, response, en_US);
+    string response;
+    server.startOk(FieldTable(), ANONYMOUS, response, en_US);
     connection.initMgmt(true);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon May 12 10:04:07 2008
@@ -40,7 +40,7 @@
         if (agent.get () != 0)
         {
             mgmtExchange = management::Exchange::shared_ptr
-                (new management::Exchange (this, parent, _name));
+                (new management::Exchange (this, parent, _name, durable));
             agent->addObject (mgmtExchange);
         }
     }
@@ -56,8 +56,9 @@
         if (agent.get () != 0)
         {
             mgmtExchange = management::Exchange::shared_ptr
-                (new management::Exchange (this, parent, _name));
-            agent->addObject (mgmtExchange);
+                (new management::Exchange (this, parent, _name, durable));
+            if (!durable)
+                agent->addObject (mgmtExchange);
         }
     }
 }
@@ -68,6 +69,16 @@
         mgmtExchange->resourceDestroy ();
 }
 
+void Exchange::setPersistenceId(uint64_t id) const
+{
+    if (mgmtExchange != 0 && persistenceId == 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        agent->addObject (mgmtExchange, id, 2);
+    }
+    persistenceId = id;
+}
+
 Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
 {
     string name;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon May 12 10:04:07 2008
@@ -90,7 +90,7 @@
             virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
 
             //PersistableExchange:
-            void setPersistenceId(uint64_t id) const { persistenceId = id; }
+            void setPersistenceId(uint64_t id) const;
             uint64_t getPersistenceId() const { return persistenceId; }
             uint32_t encodedSize() const;
             void encode(framing::Buffer& buffer) const; 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Mon May 12 10:04:07 2008
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Link.h"
+#include "LinkRegistry.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Link.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+Link::Link(LinkRegistry*           _links,
+           string&                 _host,
+           uint16_t                _port,
+           bool                    _useSsl,
+           bool                    _durable,
+           Broker*                 _broker,
+           management::Manageable* parent)
+    : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+      persistenceId(0), broker(_broker), state(0),
+      access(boost::bind(&Link::established, this),
+             boost::bind(&Link::closed, this, _1, _2),
+             boost::bind(&Link::setConnection, this, _1)),
+      visitCount(0),
+      currentInterval(1),
+      closing(false),
+      channelCounter(1)
+{
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+        if (agent.get() != 0)
+        {
+            mgmtObject = management::Link::shared_ptr
+                (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+            if (!durable)
+                agent->addObject(mgmtObject);
+        }
+    }
+    setState(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+    if (state == STATE_OPERATIONAL)
+        access.close();
+    if (mgmtObject.get () != 0)
+        mgmtObject->resourceDestroy ();
+}
+
+void Link::setState (int newState)
+{
+    if (newState == state)
+        return;
+
+    state = newState;
+    if (mgmtObject.get() == 0)
+        return;
+
+    switch (state)
+    {
+    case STATE_WAITING     : mgmtObject->set_state("Waiting");     break;
+    case STATE_CONNECTING  : mgmtObject->set_state("Connecting");  break;
+    case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+    }
+}
+
+void Link::startConnection ()
+{
+    try {
+        broker->connect (host, port, useSsl, 0, &access);
+        setState(STATE_CONNECTING);
+    } catch(std::exception& e) {
+        setState(STATE_WAITING);
+        mgmtObject->set_lastError (e.what());
+    }
+}
+
+void Link::established ()
+{
+    Mutex::ScopedLock mutex(lock);
+
+    QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
+    setState(STATE_OPERATIONAL);
+    currentInterval = 1;
+    visitCount      = 0;
+    if (closing)
+        destroy();
+}
+
+void Link::closed (int, std::string text)
+{
+    Mutex::ScopedLock mutex(lock);
+
+    if (state == STATE_OPERATIONAL)
+        QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+
+    connection.reset();
+    created.transfer(created.end(), active.begin(), active.end(), active);
+    setState(STATE_WAITING);
+    mgmtObject->set_lastError (text);
+    if (closing)
+        destroy();
+}
+
+void Link::destroy ()
+{
+    QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+    connection.reset();
+    links->destroy (host, port);
+}
+
+void Link::cancel(Bridge* bridge)
+{
+    Mutex::ScopedLock mutex(lock);
+
+    //need to take this out of the active map and add it to the cancelled map
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        if (&(*i) == bridge) {
+            cancelled.transfer(cancelled.end(), i, active);
+            break;
+        }
+    }
+
+    if (connection.get() != 0)
+        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::ioThreadProcessing()
+{
+    Mutex::ScopedLock mutex(lock);
+
+    //process any pending creates
+    if (!created.empty()) {
+        for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+            i->create(*connection);
+        }
+        active.transfer(active.end(), created.begin(), created.end(), created);
+    }
+    if (!cancelled.empty()) {
+        //process any pending cancellations
+        for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+            i->cancel();
+        }
+        cancelled.clear();
+    }
+}
+
+void Link::setConnection(Connection::shared_ptr c)
+{
+    connection = c;
+    connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::maintenanceVisit ()
+{
+    Mutex::ScopedLock mutex(lock);
+
+    if (state == STATE_WAITING)
+    {
+        visitCount++;
+        if (visitCount >= currentInterval)
+        {
+            visitCount = 0;
+            currentInterval *= 2;
+            if (currentInterval > MAX_INTERVAL)
+                currentInterval = MAX_INTERVAL;
+            startConnection();
+        }
+    }
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+    if (mgmtObject != 0 && persistenceId == 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        agent->addObject (mgmtObject, id);
+    }
+    persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+    return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+    string   host;
+    uint16_t port;
+    
+    buffer.getShortString(host);
+    port = buffer.getShort();
+    bool useSsl(buffer.getOctet());
+    bool durable(buffer.getOctet());
+
+    return links.declare(host, port, useSsl, durable).first;
+}
+
+void Link::encode(Buffer& buffer) const 
+{
+    buffer.putShortString(string("link"));
+    buffer.putShortString(host);
+    buffer.putShort(port);
+    buffer.putOctet(useSsl  ? 1 : 0);
+    buffer.putOctet(durable ? 1 : 0);
+}
+
+uint32_t Link::encodedSize() const 
+{ 
+    return host.size() + 1 // short-string (host)
+        + 5                // short-string ("link")
+        + 2                // port
+        + 1                // useSsl
+        + 1;               // durable
+}
+
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
+{
+    return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+{
+    Mutex::ScopedLock mutex(lock);
+
+    switch (op)
+    {
+    case management::Link::METHOD_CLOSE :
+        closing = true;
+        if (state != STATE_CONNECTING)
+            destroy();
+        return Manageable::STATUS_OK;
+
+    case management::Link::METHOD_BRIDGE :
+        management::ArgsLinkBridge iargs =
+            dynamic_cast<const management::ArgsLinkBridge&>(args);
+
+        // Durable bridges are only valid on durable links
+        if (iargs.i_durable && !durable)
+            return Manageable::STATUS_INVALID_PARAMETER;
+
+        created.push_back(new Bridge(this, channelCounter++,
+                                     boost::bind(&Link::cancel, this, _1), iargs));
+
+        if (state == STATE_OPERATIONAL && connection.get() != 0)
+            connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        return Manageable::STATUS_OK;
+    }
+
+    return Manageable::STATUS_UNKNOWN_METHOD;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Mon May 12 10:04:07 2008
@@ -0,0 +1,115 @@
+#ifndef _broker_Link_h
+#define _broker_Link_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "MessageStore.h"
+#include "PersistableConfig.h"
+#include "Bridge.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/ProtocolAccess.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Link.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+    namespace broker {
+
+        using std::string;
+        class LinkRegistry;
+        class Broker;
+        class Connection;
+
+        class Link : public PersistableConfig, public management::Manageable {
+        private:
+            sys::Mutex          lock;
+            LinkRegistry*       links;
+            const string        host;
+            const uint16_t      port;
+            const bool          useSsl;
+            const bool          durable;
+            mutable uint64_t    persistenceId;
+            management::Link::shared_ptr mgmtObject;
+            Broker* broker;
+            int     state;
+            sys::ProtocolAccess access;
+            uint32_t visitCount;
+            uint32_t currentInterval;
+            bool     closing;
+
+            typedef boost::ptr_vector<Bridge> Bridges;
+            Bridges created;   // Bridges pending creation
+            Bridges active;    // Bridges active
+            Bridges cancelled; // Bridges pending deletion
+            uint channelCounter;
+            boost::shared_ptr<Connection> connection;
+
+            static const int STATE_WAITING     = 1;
+            static const int STATE_CONNECTING  = 2;
+            static const int STATE_OPERATIONAL = 3;
+
+            static const uint32_t MAX_INTERVAL = 16;
+
+            void setState (int newState);
+            void startConnection();          // Start the IO Connection
+            void established();              // Called when connection is created
+            void closed(int, std::string);   // Called when connection goes away
+            void destroy();                  // Called when mgmt deletes this link
+            void cancel(Bridge*);            // Called by self-cancelling bridge
+            void ioThreadProcessing();       // Called on connection's IO thread by request
+            void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
+
+        public:
+            typedef boost::shared_ptr<Link> shared_ptr;
+
+            Link(LinkRegistry*           links,
+                 string&                 host,
+                 uint16_t                port,
+                 bool                    useSsl,
+                 bool                    durable,
+                 Broker*                 broker,
+                 management::Manageable* parent = 0);
+            virtual ~Link();
+
+            bool isDurable() { return durable; }
+            void maintenanceVisit ();
+
+            // PersistableConfig:
+            void     setPersistenceId(uint64_t id) const;
+            uint64_t getPersistenceId() const { return persistenceId; }
+            uint32_t encodedSize() const;
+            void     encode(framing::Buffer& buffer) const; 
+            const string& getName() const;
+
+            static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+            // Manageable entry points
+            management::ManagementObject::shared_ptr GetManagementObject (void) const;
+            management::Manageable::status_t ManagementMethod (uint32_t, management::Args&);
+        };
+    }
+}
+
+
+#endif  /*!_broker_Link.cpp_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon May 12 10:04:07 2008
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LinkRegistry.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+using std::stringstream;
+using boost::intrusive_ptr;
+
+#define LINK_MAINT_INTERVAL 5
+
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+{
+    timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+}
+
+LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
+    TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {}
+
+void LinkRegistry::Periodic::fire ()
+{
+    links.periodicMaintenance ();
+    links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+}
+
+void LinkRegistry::periodicMaintenance ()
+{
+    Mutex::ScopedLock locker(lock);
+    linksToDestroy.clear();
+    for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
+        i->second->maintenanceVisit();
+}
+
+pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+                                                   uint16_t     port,
+                                                   bool         useSsl,
+                                                   bool         durable)
+{
+    Mutex::ScopedLock   locker(lock);
+    stringstream        keystream;
+    keystream << host << ":" << port;
+    string key = string(keystream.str());
+
+    LinkMap::iterator i = links.find(key);
+    if (i == links.end())
+    {
+        Link::shared_ptr link;
+
+        link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+        links[key] = link;
+        return std::pair<Link::shared_ptr, bool>(link, true);
+    }
+    return std::pair<Link::shared_ptr, bool>(i->second, false);
+}
+
+void LinkRegistry::destroy(const string& host, const uint16_t port)
+{
+    Mutex::ScopedLock   locker(lock);
+    stringstream        keystream;
+    keystream << host << ":" << port;
+    string key = string(keystream.str());
+
+    LinkMap::iterator i = links.find(key);
+    if (i != links.end())
+    {
+        if (i->second->isDurable() && store)
+            store->destroy(*(i->second));
+        linksToDestroy[key] = i->second;
+        links.erase(i);
+    }
+}
+
+void LinkRegistry::setStore (MessageStore* _store)
+{
+    assert (store == 0 && _store != 0);
+    store = _store;
+}
+
+MessageStore* LinkRegistry::getStore() const {
+    return store;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Mon May 12 10:04:07 2008
@@ -0,0 +1,87 @@
+#ifndef _broker_LinkRegistry_h
+#define _broker_LinkRegistry_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include "Link.h"
+#include "MessageStore.h"
+#include "Timer.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+
+    class Broker;
+    class LinkRegistry {
+
+        // Declare a timer task to manage the establishment of link connections and the
+        // re-establishment of lost link connections.
+        struct Periodic : public TimerTask
+        {
+            LinkRegistry& links;
+
+            Periodic(LinkRegistry& links);
+            virtual ~Periodic() {};
+            void fire();
+        };
+
+        typedef std::map<std::string, Link::shared_ptr> LinkMap;
+        LinkMap links;
+        LinkMap linksToDestroy;
+        qpid::sys::Mutex lock;
+        Broker* broker;
+        Timer   timer;
+        management::Manageable* parent;
+        MessageStore* store;
+
+        void periodicMaintenance ();
+
+    public:
+        LinkRegistry (Broker* _broker);
+        std::pair<Link::shared_ptr, bool> declare(std::string& host,
+                                                  uint16_t     port,
+                                                  bool         useSsl,
+                                                  bool         durable);
+        void destroy(const std::string& host, const uint16_t port);
+
+        /**
+         * Register the manageable parent for declared queues
+         */
+        void setParent (management::Manageable* _parent) { parent = _parent; }
+
+        /**
+         * Set the store to use.  May only be called once.
+         */
+        void setStore (MessageStore*);
+
+        /**
+         * Return the message store used.
+         */
+        MessageStore* getStore() const;
+    };
+}
+}
+
+
+#endif  /*!_broker_LinkRegistry_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Mon May 12 10:04:07 2008
@@ -24,6 +24,7 @@
 #include "PersistableExchange.h"
 #include "PersistableMessage.h"
 #include "PersistableQueue.h"
+#include "PersistableConfig.h"
 #include "RecoveryManager.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/FieldTable.h"
@@ -87,6 +88,16 @@
                         const std::string& key, const framing::FieldTable& args) = 0;
 
     /**
+     * Record generic durable configuration
+     */
+    virtual void create(const PersistableConfig& config) = 0;
+
+    /**
+     * Destroy generic durable configuration
+     */
+    virtual void destroy(const PersistableConfig& config) = 0;
+
+    /**
      * Stores a messages before it has been enqueued
      * (enqueueing automatically stores the message so this is
      * only required if storage is required prior to that

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Mon May 12 10:04:07 2008
@@ -70,6 +70,16 @@
     TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
 }
 
+void MessageStoreModule::create(const PersistableConfig& config)
+{
+    TRANSFER_EXCEPTION(store->create(config));
+}
+
+void MessageStoreModule::destroy(const PersistableConfig& config)
+{
+    TRANSFER_EXCEPTION(store->destroy(config));
+}
+
 void MessageStoreModule::recover(RecoveryManager& registry)
 {
     TRANSFER_EXCEPTION(store->recover(registry));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Mon May 12 10:04:07 2008
@@ -57,6 +57,8 @@
               const std::string& key, const framing::FieldTable& args);
     void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, 
                 const std::string& key, const framing::FieldTable& args);
+    void create(const PersistableConfig& config);
+    void destroy(const PersistableConfig& config);
     void recover(RecoveryManager& queues);
     void stage(boost::intrusive_ptr<PersistableMessage>& msg);
     void destroy(PersistableMessage& msg);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Mon May 12 10:04:07 2008
@@ -49,7 +49,7 @@
 
 using namespace qpid::broker;
 
-NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {}
 
 bool NullMessageStore::init(const Options* /*options*/) {return true;}
 
@@ -57,6 +57,7 @@
 {
     QPID_LOG(info, "Queue '" << queue.getName() 
              << "' will not be durable. Persistence not enabled.");
+    queue.setPersistenceId(nextPersistenceId++);
 }
 
 void NullMessageStore::destroy(PersistableQueue&)
@@ -67,6 +68,7 @@
 {
     QPID_LOG(info, "Exchange'" << exchange.getName() 
              << "' will not be durable. Persistence not enabled.");
+    exchange.setPersistenceId(nextPersistenceId++);
 }
 
 void NullMessageStore::destroy(const PersistableExchange& )
@@ -76,6 +78,17 @@
 
 void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
 
+void NullMessageStore::create(const PersistableConfig& config)
+{
+    QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+    config.setPersistenceId(nextPersistenceId++);
+}
+
+void NullMessageStore::destroy(const PersistableConfig&)
+{
+    QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+}
+
 void NullMessageStore::recover(RecoveryManager&)
 {
     QPID_LOG(info, "Persistence not enabled, no recovery attempted.");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Mon May 12 10:04:07 2008
@@ -37,6 +37,7 @@
 {
     std::set<std::string> prepared;
     const bool warn;
+    uint64_t nextPersistenceId;
 public:
     NullMessageStore(bool warn = false);
 
@@ -57,6 +58,8 @@
                       const std::string& key, const framing::FieldTable& args);
     virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, 
                         const std::string& key, const framing::FieldTable& args);
+    virtual void create(const PersistableConfig& config);
+    virtual void destroy(const PersistableConfig& config);
     virtual void recover(RecoveryManager& queues);
     virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg);
     virtual void destroy(PersistableMessage& msg);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h Mon May 12 10:04:07 2008
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableConfig_h
+#define _broker_PersistableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface used by general-purpose persistable configuration for
+ * the message store.
+ */
+class PersistableConfig : public Persistable
+{
+public:
+    virtual  const std::string& getName() const = 0;
+    virtual ~PersistableConfig() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableConfig.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon May 12 10:04:07 2008
@@ -586,7 +586,7 @@
     if (mgmtObject != 0 && persistenceId == 0)
     {
         ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-        agent->addObject (mgmtObject, _persistenceId);
+        agent->addObject (mgmtObject, _persistenceId, 3);
     }
     persistenceId = _persistenceId;
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h?rev=655563&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h Mon May 12 10:04:07 2008
@@ -0,0 +1,45 @@
+#ifndef _broker_RecoverableConfig_h
+#define _broker_RecoverableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which configurations are recovered.
+ */
+class RecoverableConfig
+{
+public:
+    typedef boost::shared_ptr<RecoverableConfig> shared_ptr;
+
+    virtual void setPersistenceId(uint64_t id) = 0;
+    virtual ~RecoverableConfig() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableConfig.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h Mon May 12 10:04:07 2008
@@ -25,6 +25,7 @@
 #include "RecoverableQueue.h"
 #include "RecoverableMessage.h"
 #include "RecoverableTransaction.h"
+#include "RecoverableConfig.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/Buffer.h"
 
@@ -39,6 +40,8 @@
     virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
     virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, 
                                                                   std::auto_ptr<TPCTransactionContext> txn) = 0;
+    virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0;
+
     virtual void recoveryComplete() = 0;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon May 12 10:04:07 2008
@@ -22,6 +22,7 @@
 
 #include "Message.h"
 #include "Queue.h"
+#include "Link.h"
 #include "RecoveredEnqueue.h"
 #include "RecoveredDequeue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -34,9 +35,9 @@
 static const uint8_t BASIC = 1;
 static const uint8_t MESSAGE = 2;
 
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, 
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
                                          DtxManager& _dtxMgr, uint64_t _stagingThreshold) 
-    : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
 
 RecoveryManagerImpl::~RecoveryManagerImpl() {}
 
@@ -82,6 +83,15 @@
     void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
 };
 
+class RecoverableConfigImpl : public RecoverableConfig
+{
+    // TODO: Add links for other config types, consider using super class (PersistableConfig?)
+    Link::shared_ptr link;
+public:
+    RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+    void setPersistenceId(uint64_t id);
+};
+
 class RecoverableTransactionImpl : public RecoverableTransaction
 {
     DtxBuffer::shared_ptr buffer;
@@ -125,6 +135,19 @@
     return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
 }
 
+RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
+{
+    string kind;
+
+    buffer.getShortString (kind);
+    if (kind == "link")
+    {
+        return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
+    }
+
+    return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+}
+
 void RecoveryManagerImpl::recoveryComplete()
 {
     //TODO (finalise binding setup etc)
@@ -185,6 +208,13 @@
     exchange->setPersistenceId(id);
 }
 
+void RecoverableConfigImpl::setPersistenceId(uint64_t id)
+{
+    if (link.get())
+        link->setPersistenceId(id);
+    // TODO: add calls to other types.  Consider using a parent class.
+}
+
 void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
 {
     Queue::shared_ptr queue = queues.find(queueName);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Mon May 12 10:04:07 2008
@@ -25,6 +25,7 @@
 #include "DtxManager.h"
 #include "ExchangeRegistry.h"
 #include "QueueRegistry.h"
+#include "LinkRegistry.h"
 #include "RecoveryManager.h"
 
 namespace qpid {
@@ -33,10 +34,12 @@
     class RecoveryManagerImpl : public RecoveryManager{
         QueueRegistry& queues;
         ExchangeRegistry& exchanges;
+        LinkRegistry& links;
         DtxManager& dtxMgr;
         const uint64_t stagingThreshold;
     public:
-        RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
+        RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
+                            DtxManager& dtxMgr, uint64_t stagingThreshold);
         ~RecoveryManagerImpl();
 
         RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
@@ -44,6 +47,7 @@
         RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
         RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, 
                                                               std::auto_ptr<TPCTransactionContext> txn);
+        RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer);
         void recoveryComplete();
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.h Mon May 12 10:04:07 2008
@@ -42,9 +42,6 @@
 
     management::ManagementObject::shared_ptr GetManagementObject (void) const
     { return mgmtObject; }
-
-    management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
-    { return management::Manageable::STATUS_OK; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp Mon May 12 10:04:07 2008
@@ -25,13 +25,19 @@
 {
     switch (status)
     {
-    case STATUS_OK              : return "OK";
-    case STATUS_UNKNOWN_OBJECT  : return "UnknownObject";
-    case STATUS_UNKNOWN_METHOD  : return "UnknownMethod";
-    case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
-    case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+    case STATUS_OK                      : return "OK";
+    case STATUS_UNKNOWN_OBJECT          : return "UnknownObject";
+    case STATUS_UNKNOWN_METHOD          : return "UnknownMethod";
+    case STATUS_NOT_IMPLEMENTED         : return "NotImplemented";
+    case STATUS_INVALID_PARAMETER       : return "InvalidParameter";
+    case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
     }
 
     return "??";
 }
 
+Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&)
+{
+    return STATUS_UNKNOWN_METHOD;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h?rev=655563&r1=655562&r2=655563&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h Mon May 12 10:04:07 2008
@@ -39,11 +39,12 @@
     typedef uint32_t status_t;
     static std::string StatusText (status_t status);
 
-    static const status_t STATUS_OK                = 0;
-    static const status_t STATUS_UNKNOWN_OBJECT    = 1;
-    static const status_t STATUS_UNKNOWN_METHOD    = 2;
-    static const status_t STATUS_NOT_IMPLEMENTED   = 3;
-    static const status_t STATUS_INVALID_PARAMETER = 4;
+    static const status_t STATUS_OK                      = 0;
+    static const status_t STATUS_UNKNOWN_OBJECT          = 1;
+    static const status_t STATUS_UNKNOWN_METHOD          = 2;
+    static const status_t STATUS_NOT_IMPLEMENTED         = 3;
+    static const status_t STATUS_INVALID_PARAMETER       = 4;
+    static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
 
     //  Every "Manageable" object must hold a reference to exactly one
     //  management object.  This object is always of a class derived from
@@ -58,7 +59,7 @@
     //  on this object.  The input and output arguments are specific to the
     //  method being called and must be down-cast to the appropriate sub class
     //  before use.
-    virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+    virtual status_t ManagementMethod (uint32_t methodId, Args& args);
 };
 
 inline Manageable::~Manageable (void) {}