You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [4/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Fri Aug  3 12:13:32 2012
@@ -108,7 +108,6 @@ Broker::Options::Options(const std::stri
     noDataDir(0),
     port(DEFAULT_PORT),
     workerThreads(5),
-    maxConnections(500),
     connectionBacklog(10),
     enableMgmt(1),
     mgmtPublish(1),
@@ -128,8 +127,10 @@ Broker::Options::Options(const std::stri
     queueFlowResumeRatio(70),
     queueThresholdEventRatio(80),
     defaultMsgGroup("qpid.no-group"),
-    timestampRcvMsgs(false),     // set the 0.10 timestamp delivery property
-    linkMaintenanceInterval(2)
+    timestampRcvMsgs(false),    // set the 0.10 timestamp delivery property
+    linkMaintenanceInterval(2),
+    linkHeartbeatInterval(120),
+    maxNegotiateTime(2000)      // 2s
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -146,7 +147,6 @@ Broker::Options::Options(const std::stri
         ("no-data-dir", optValue(noDataDir), "Don't use a data directory.  No persistent configuration will be loaded or stored")
         ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
         ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
-        ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
@@ -171,6 +171,9 @@ Broker::Options::Options(const std::stri
         ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
         ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
         ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
+        ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
+        ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+        ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
         ;
 }
 
@@ -208,7 +211,6 @@ Broker::Broker(const Broker::Options& co
     inCluster(false),
     clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
-    connectionCounter(conf.maxConnections),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
     deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
 {
@@ -227,7 +229,6 @@ Broker::Broker(const Broker::Options& co
         mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
         mgmtObject->set_port(conf.port);
         mgmtObject->set_workerThreads(conf.workerThreads);
-        mgmtObject->set_maxConns(conf.maxConnections);
         mgmtObject->set_connBacklog(conf.connectionBacklog);
         mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
         mgmtObject->set_mgmtPublish(conf.mgmtPublish);
@@ -244,8 +245,11 @@ Broker::Broker(const Broker::Options& co
         // management schema correct.
         Vhost* vhost = new Vhost(this, this);
         vhostObject = Vhost::shared_ptr(vhost);
-        framing::Uuid uuid(managementAgent->getUuid());
-        federationTag = uuid.str();
+        if (conf.fedTag.empty()) {
+            framing::Uuid uuid(managementAgent->getUuid());
+            federationTag = uuid.str();
+        } else
+            federationTag = conf.fedTag;
         vhostObject->setFederationTag(federationTag);
 
         queues.setParent(vhost);
@@ -254,8 +258,11 @@ Broker::Broker(const Broker::Options& co
     } else {
         // Management is disabled so there is no broker management ID.
         // Create a unique uuid to use as the federation tag.
-        framing::Uuid uuid(true);
-        federationTag = uuid.str();
+        if (conf.fedTag.empty()) {
+            framing::Uuid uuid(true);
+            federationTag = uuid.str();
+        } else
+            federationTag = conf.fedTag;
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
@@ -346,7 +353,7 @@ Broker::Broker(const Broker::Options& co
         knownBrokers.push_back(Url(conf.knownHosts));
     }
 
-    } catch (const std::exception& /*e*/) {
+    } catch (const std::exception&) {
         finalize();
         throw;
     }
@@ -443,7 +450,7 @@ Manageable* Broker::GetVhostObject(void)
 
 Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
                                                Args&    args,
-                                               string&)
+                                               string&  text)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
@@ -458,6 +465,14 @@ Manageable::status_t Broker::ManagementM
         status = Manageable::STATUS_OK;
         break;
     case _qmf::Broker::METHOD_CONNECT : {
+        /** Management is creating a Link to a remote broker using the host and port of
+         * the remote.  This (old) interface does not allow management to specify a name
+         * for the link, nor does it allow multiple Links to the same remote.  Use the
+         * "create()" broker method if these features are needed.
+         * TBD: deprecate this interface.
+         */
+        QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
+                 " Please use the Broker::create() method with type='link' instead.");
         _qmf::ArgsBrokerConnect& hp=
             dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
 
@@ -466,13 +481,24 @@ Manageable::status_t Broker::ManagementM
                         "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
         if (!getProtocolFactory(transport)) {
             QPID_LOG(error, "Transport '" << transport << "' not supported");
+            text = "transport type not supported";
             return  Manageable::STATUS_NOT_IMPLEMENTED;
         }
-        std::pair<Link::shared_ptr, bool> response =
-            links.declare (hp.i_host, hp.i_port, transport, hp.i_durable,
-                           hp.i_authMechanism, hp.i_username, hp.i_password);
-        if (hp.i_durable && response.second)
-            store->create(*response.first);
+
+        // Does a link to the remote already exist?  If so, re-use the existing link
+        // - this behavior is backward compatible with previous releases.
+        if (!links.getLink(hp.i_host, hp.i_port, transport)) {
+            // new link, need to generate a unique name for it
+            std::pair<Link::shared_ptr, bool> response =
+              links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+                            hp.i_host, hp.i_port, transport,
+                            hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
+            if (!response.first) {
+                text = "Unable to create Link";
+                status = Manageable::STATUS_PARAMETER_INVALID;
+                break;
+            }
+        }
         status = Manageable::STATUS_OK;
         break;
       }
@@ -543,6 +569,8 @@ const std::string TYPE_QUEUE("queue");
 const std::string TYPE_EXCHANGE("exchange");
 const std::string TYPE_TOPIC("topic");
 const std::string TYPE_BINDING("binding");
+const std::string TYPE_LINK("link");
+const std::string TYPE_BRIDGE("bridge");
 const std::string DURABLE("durable");
 const std::string AUTO_DELETE("auto-delete");
 const std::string ALTERNATE_EXCHANGE("alternate-exchange");
@@ -554,6 +582,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_
 
 const std::string _TRUE("true");
 const std::string _FALSE("false");
+
+// parameters for creating a Link object, see mgmt schema
+const std::string HOST("host");
+const std::string PORT("port");
+const std::string TRANSPORT("transport");
+const std::string AUTH_MECHANISM("authMechanism");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+
+// parameters for creating a Bridge object, see mgmt schema
+const std::string LINK("link");
+const std::string SRC("src");
+const std::string DEST("dest");
+const std::string KEY("key");
+const std::string TAG("tag");
+const std::string EXCLUDES("excludes");
+const std::string SRC_IS_QUEUE("srcIsQueue");
+const std::string SRC_IS_LOCAL("srcIsLocal");
+const std::string DYNAMIC("dynamic");
+const std::string SYNC("sync");
 }
 
 struct InvalidBindingIdentifier : public qpid::Exception
@@ -603,6 +651,25 @@ struct UnknownObjectType : public qpid::
     std::string getPrefix() const { return "unknown object type"; }
 };
 
+struct ReservedObjectName : public qpid::Exception
+{
+    ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return std::string("names prefixed with '")
+          + QPID_NAME_PREFIX + std::string("' are reserved"); }
+};
+
+struct UnsupportedTransport : public qpid::Exception
+{
+    UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return "transport is not supported"; }
+};
+
+struct InvalidParameter : public qpid::Exception
+{
+    InvalidParameter(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return "invalid parameter to method call"; }
+};
+
 void Broker::createObject(const std::string& type, const std::string& name,
                           const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
 {
@@ -674,6 +741,113 @@ void Broker::createObject(const std::str
         amqp_0_10::translate(extensions, arguments);
 
         bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+
+    } else if (type == TYPE_LINK) {
+
+        QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
+
+        if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+            QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+            throw ReservedObjectName(name);
+        }
+
+        std::string host;
+        uint16_t port = 0;
+        std::string transport = TCP_TRANSPORT;
+        bool durable = false;
+        std::string authMech, username, password;
+
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            if (i->first == HOST) host = i->second.asString();
+            else if (i->first == PORT) port = i->second.asUint16();
+            else if (i->first == TRANSPORT) transport = i->second.asString();
+            else if (i->first == DURABLE) durable = bool(i->second);
+            else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
+            else if (i->first == USERNAME) username = i->second.asString();
+            else if (i->first == PASSWORD) password = i->second.asString();
+            else {
+                // TODO: strict checking here
+            }
+        }
+
+        if (!getProtocolFactory(transport)) {
+            QPID_LOG(error, "Transport '" << transport << "' not supported.");
+            throw UnsupportedTransport(transport);
+        }
+
+        std::pair<boost::shared_ptr<Link>, bool> rc;
+        rc = links.declare(name, host, port, transport, durable, authMech, username, password);
+        if (!rc.first) {
+            QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
+                      "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
+            throw InvalidParameter(name);
+        }
+        if (!rc.second) {
+            QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
+            throw ObjectAlreadyExists(name);
+        }
+
+    } else if (type == TYPE_BRIDGE) {
+
+        QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
+
+        if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+            QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+            throw ReservedObjectName(name);
+        }
+
+        std::string linkName;
+        std::string src;
+        std::string dest;
+        std::string key;
+        std::string id;
+        std::string excludes;
+        std::string queueName;
+        bool durable = false;
+        bool srcIsQueue = false;
+        bool srcIsLocal = false;
+        bool dynamic = false;
+        uint16_t sync = 0;
+
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+
+            if (i->first == LINK) linkName = i->second.asString();
+            else if (i->first == SRC) src = i->second.asString();
+            else if (i->first == DEST) dest = i->second.asString();
+            else if (i->first == KEY) key = i->second.asString();
+            else if (i->first == TAG) id = i->second.asString();
+            else if (i->first == EXCLUDES) excludes = i->second.asString();
+            else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
+            else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
+            else if (i->first == DYNAMIC) dynamic = bool(i->second);
+            else if (i->first == SYNC) sync = i->second.asUint16();
+            else if (i->first == DURABLE) durable = bool(i->second);
+            else if (i->first == QUEUE_NAME) queueName = i->second.asString();
+            else {
+                // TODO: strict checking here
+            }
+        }
+
+        boost::shared_ptr<Link> link;
+        if (linkName.empty() || !(link = links.getLink(linkName))) {
+            QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
+            throw InvalidParameter(name);
+        }
+        std::pair<Bridge::shared_ptr, bool> rc =
+          links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
+                        dynamic, sync,
+                        0,
+                        queueName);
+
+        if (!rc.first) {
+            QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
+                      "; src=" << src << "; dest=" << dest << "; key=" << key);
+            throw InvalidParameter(name);
+        }
+        if (!rc.second) {
+            QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
+            throw ObjectAlreadyExists(name);
+        }
     } else {
         throw UnknownObjectType(type);
     }
@@ -696,6 +870,16 @@ void Broker::deleteObject(const std::str
     } else if (type == TYPE_BINDING) {
         BindingIdentifier binding(name);
         unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+    } else if (type == TYPE_LINK) {
+        boost::shared_ptr<Link> link = links.getLink(name);
+        if (link) {
+            link->close();
+        }
+    } else if (type == TYPE_BRIDGE) {
+        boost::shared_ptr<Bridge> bridge = links.getBridge(name);
+        if (bridge) {
+            bridge->close();
+        }
     } else {
         throw UnknownObjectType(type);
     }
@@ -920,6 +1104,13 @@ std::pair<boost::shared_ptr<Queue>, bool
                                         ManagementAgent::toMap(arguments),
                                         "created"));
         }
+        QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+            << " user:" << userId
+            << " rhost:" << connectionId
+            << " durable:" << (durable ? "T" : "F")
+            << " owner:" << owner
+            << " autodelete:" << (autodelete ? "T" : "F")
+            << " alternateExchange:" << alternateExchange );
     }
     return result;
 }
@@ -942,6 +1133,10 @@ void Broker::deleteQueue(const std::stri
 
     if (managementAgent.get())
         managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+    QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
+        << " user:" << userId
+        << " rhost:" << connectionId
+    );
 
 }
 
@@ -993,6 +1188,12 @@ std::pair<Exchange::shared_ptr, bool> Br
                                                          ManagementAgent::toMap(arguments),
                                                          "created"));
         }
+        QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
+            << " user:" << userId
+            << " rhost:" << connectionId
+            << " type:" << type
+            << " alternateExchange:" << alternateExchange
+            << " durable:" << (durable ? "T" : "F"));
     }
     return result;
 }
@@ -1017,7 +1218,9 @@ void Broker::deleteExchange(const std::s
 
     if (managementAgent.get())
         managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+    QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
+        << " user:" << userId
+        << " rhost:" << connectionId);
 }
 
 void Broker::bind(const std::string& queueName,
@@ -1047,10 +1250,16 @@ void Broker::bind(const std::string& que
         throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
     } else {
         if (queue->bind(exchange, key, arguments)) {
+            getConfigurationObservers().bind(exchange, queue, key, arguments);
             if (managementAgent.get()) {
                 managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
                                                   queueName, key, ManagementAgent::toMap(arguments)));
             }
+            QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
+                << " queue:" << queueName
+                << " key:" << key
+                << " user:" << userId
+                << " rhost:" << connectionId);
         }
     }
 }
@@ -1082,12 +1291,33 @@ void Broker::unbind(const std::string& q
             if (exchange->isDurable() && queue->isDurable()) {
                 store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
             }
+            getConfigurationObservers().unbind(
+                exchange, queue, key, framing::FieldTable());
             if (managementAgent.get()) {
                 managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
             }
+            QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName
+                << " queue:" << queueName
+                << " key:" << key
+                << " user:" << userId
+                << " rhost:" << connectionId);
         }
     }
 }
 
+// FIXME aconway 2012-04-27: access to linkClientProperties is
+// not properly thread safe, you could lose fields if 2 threads
+// attempt to add a field concurrently.
+
+framing::FieldTable Broker::getLinkClientProperties() const {
+    sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+    return linkClientProperties;
+}
+
+void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
+    sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+    linkClientProperties = ft;
+}
+
 }} // namespace qpid::broker
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Fri Aug  3 12:13:32 2012
@@ -40,6 +40,7 @@
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/ConsumerFactory.h"
 #include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/ConfigurationObservers.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -64,8 +65,8 @@
 namespace qpid {
 
 namespace sys {
-    class ProtocolFactory;
-    class Poller;
+class ProtocolFactory;
+class Poller;
 }
 
 struct Url;
@@ -91,7 +92,7 @@ class Broker : public sys::Runnable, pub
                public management::Manageable,
                public RefCounted
 {
-public:
+  public:
 
     struct Options : public qpid::Options {
         static const std::string DEFAULT_DATA_DIR_LOCATION;
@@ -103,7 +104,6 @@ public:
         std::string dataDir;
         uint16_t port;
         int workerThreads;
-        int maxConnections;
         int connectionBacklog;
         bool enableMgmt;
         bool mgmtPublish;
@@ -127,31 +127,14 @@ public:
         std::string defaultMsgGroup;
         bool timestampRcvMsgs;
         double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
+        uint16_t linkHeartbeatInterval;
+        uint32_t maxNegotiateTime;  // Max time in ms for connection with no negotiation
+        std::string fedTag;
 
       private:
         std::string getHome();
     };
 
-    class ConnectionCounter {
-            int maxConnections;
-            int connectionCount;
-            sys::Mutex connectionCountLock;
-        public:
-            ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
-            void inc_connectionCount() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
-                connectionCount++;
-            }
-            void dec_connectionCount() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
-                connectionCount--;
-            }
-            bool allowConnection() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
-                return (maxConnections <= connectionCount);
-            }
-    };
-
   private:
     typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
 
@@ -183,6 +166,7 @@ public:
     AclModule* acl;
     DataDir dataDir;
     ConnectionObservers connectionObservers;
+    ConfigurationObservers configurationObservers;
 
     QueueRegistry queues;
     ExchangeRegistry exchanges;
@@ -203,9 +187,11 @@ public:
     bool recovery;
     bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    ConnectionCounter connectionCounter;
     ConsumerFactories consumerFactories;
 
+    mutable sys::Mutex linkClientPropertiesLock;
+    framing::FieldTable linkClientProperties;
+
   public:
     QPID_BROKER_EXTERN virtual ~Broker();
 
@@ -317,8 +303,6 @@ public:
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
 
-    ConnectionCounter& getConnectionCounter() {return connectionCounter;}
-
     /**
      * Never true in a stand-alone broker. In a cluster, return true
      * to defer delivery of messages deliveredg in a cluster-unsafe
@@ -377,6 +361,14 @@ public:
 
     ConsumerFactories&  getConsumerFactories() { return consumerFactories; }
     ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+    ConfigurationObservers& getConfigurationObservers() { return configurationObservers; }
+
+    /** Properties to be set on outgoing link connections */
+    QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
+    QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+
+    /** Information identifying this system */
+    boost::shared_ptr<const System> getSystem() const { return systemObject; }
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp Fri Aug  3 12:13:32 2012
@@ -43,7 +43,7 @@
 #include <iostream>
 #include <assert.h>
 
-
+using std::string;
 
 using namespace qpid::sys;
 using namespace qpid::framing;
@@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputH
                        bool link_,
                        uint64_t objectId_,
                        bool shadow_,
-                       bool delayManagement) :
+                       bool delayManagement,
+                       bool authenticated_
+) :
     ConnectionState(out_, broker_),
     securitySettings(external),
-    adapter(*this, link_, shadow_),
+    shadow(shadow_),
+    authenticated(authenticated_),
+    adapter(*this, link_),
     link(link_),
     mgmtClosing(false),
     mgmtId(mgmtId_),
@@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputH
     timer(broker_.getTimer()),
     errorListener(0),
     objectId(objectId_),
-    shadow(shadow_),
     outboundTracker(*this)
 {
     outboundTracker.wrap(out);
     broker.getConnectionObservers().connection(*this);
     // In a cluster, allow adding the management object to be delayed.
     if (!delayManagement) addManagementObject();
-    if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
 }
 
 void Connection::addManagementObject() {
@@ -141,6 +143,8 @@ Connection::~Connection()
         // a cluster-unsafe context. Don't raise an event in that case.
         if (!link && isClusterSafe())
             agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+        QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+            << " rhost:" << mgmtId );
     }
     broker.getConnectionObservers().closed(*this);
 
@@ -148,8 +152,9 @@ Connection::~Connection()
         heartbeatTimer->cancel();
     if (timeoutTimer)
         timeoutTimer->cancel();
-
-    if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
+    if (linkHeartbeatTimer) {
+        linkHeartbeatTimer->cancel();
+    }
 }
 
 void Connection::received(framing::AMQFrame& frame) {
@@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() {
         mgmtObject->set_authIdentity(userId);
         agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
     }
+
+    QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
+        << " rhost:" << mgmtId );
+
 }
 
 void Connection::setUserProxyAuth(bool b)
@@ -300,6 +309,9 @@ void Connection::close(connection::Close
         heartbeatTimer->cancel();
     if (timeoutTimer)
         timeoutTimer->cancel();
+    if (linkHeartbeatTimer) {
+        linkHeartbeatTimer->cancel();
+    }
     adapter.close(code, text);
     //make sure we delete dangling pointers from outputTasks before deleting sessions
     outputTasks.removeAll();
@@ -313,6 +325,9 @@ void Connection::sendClose() {
         heartbeatTimer->cancel();
     if (timeoutTimer)
         timeoutTimer->cancel();
+    if (linkHeartbeatTimer) {
+        linkHeartbeatTimer->cancel();
+    }
     adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
     getOutput().close();
 }
@@ -326,6 +341,9 @@ void Connection::closed(){ // Physically
         heartbeatTimer->cancel();
     if (timeoutTimer)
         timeoutTimer->cancel();
+    if (linkHeartbeatTimer) {
+        linkHeartbeatTimer->cancel();
+    }
     try {
         while (!channels.empty())
             ptr_map_ptr(channels.begin())->handleDetach();
@@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public 
     }
 };
 
+class LinkHeartbeatTask : public qpid::sys::TimerTask {
+    sys::Timer& timer;
+    Connection& connection;
+    bool heartbeatSeen;
+
+    void fire() {
+        if (!heartbeatSeen) {
+            QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection");
+            connection.abort();
+        } else {
+            heartbeatSeen = false;
+            // Setup next firing
+            setupNextFire();
+            timer.add(this);
+        }
+    }
+
+public:
+    LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) :
+        TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {}
+
+    void heartbeatReceived() { heartbeatSeen = true; }
+};
+
+
 void Connection::abort()
 {
     // Make sure that we don't try to send a heartbeat as we're
@@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(ui
     }
 }
 
+void Connection::startLinkHeartbeatTimeoutTask() {
+    if (!linkHeartbeatTimer && heartbeat > 0) {
+        linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
+        timer.add(linkHeartbeatTimer);
+    }
+}
+
 void Connection::restartTimeout()
 {
     if (timeoutTimer)
         timeoutTimer->touch();
+
+    if (linkHeartbeatTimer) {
+        static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived();
+    }
 }
 
 bool Connection::isOpen() { return adapter.isOpen(); }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h Fri Aug  3 12:13:32 2012
@@ -27,8 +27,7 @@
 #include <vector>
 #include <queue>
 
-#include <boost/ptr_container/ptr_map.hpp>
-
+#include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/ConnectionHandler.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/SessionHandler.h"
@@ -86,15 +85,22 @@ class Connection : public sys::Connectio
                bool isLink = false,
                uint64_t objectId = 0,
                bool shadow=false,
-               bool delayManagement = false);
+               bool delayManagement = false,
+               bool authenticated=true);
 
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
     SessionHandler& getChannel(framing::ChannelId channel);
 
-    /** Close the connection */
-    void close(framing::connection::CloseCode code, const std::string& text);
+    /** Close the connection. Waits for the client to respond with close-ok
+     * before actually destroying the connection.
+     */
+    QPID_BROKER_EXTERN void close(
+        framing::connection::CloseCode code, const std::string& text);
+
+    /** Abort the connection. Close abruptly and immediately. */
+    QPID_BROKER_EXTERN void abort();
 
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
@@ -138,8 +144,7 @@ class Connection : public sys::Connectio
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
     void restartTimeout();
-    void abort();
-
+    
     template <class F> void eachSessionHandler(F f) {
         for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
             f(*ptr_map_ptr(i));
@@ -149,7 +154,10 @@ class Connection : public sys::Connectio
     void setSecureConnection(SecureConnection* secured);
 
     /** True if this is a shadow connection in a cluster. */
-    bool isShadow() { return shadow; }
+    bool isShadow() const { return shadow; }
+
+    /** True if this connection is authenticated */
+    bool isAuthenticated() const { return authenticated; }
 
     // Used by cluster to update connection status
     sys::AggregateOutput& getOutputTasks() { return outputTasks; }
@@ -166,6 +174,7 @@ class Connection : public sys::Connectio
     bool isOpen();
 
     bool isLink() { return link; }
+    void startLinkHeartbeatTimeoutTask();
 
     // Used by cluster during catch-up, see cluster::OutputInterceptor
     void doIoCallbacks();
@@ -179,6 +188,8 @@ class Connection : public sys::Connectio
 
     ChannelMap channels;
     qpid::sys::SecuritySettings securitySettings;
+    bool shadow;
+    bool authenticated;
     ConnectionHandler adapter;
     const bool link;
     bool mgmtClosing;
@@ -189,11 +200,10 @@ class Connection : public sys::Connectio
     LinkRegistry& links;
     management::ManagementAgent* agent;
     sys::Timer& timer;
-    boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
+    boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
     boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
     ErrorListener* errorListener;
     uint64_t objectId;
-    bool shadow;
     framing::FieldTable clientProperties;
 
     /**

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp Fri Aug  3 12:13:32 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() 
 sys::ConnectionCodec*
 ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
                           const SecuritySettings& external) {
-    if (broker.getConnectionCounter().allowConnection())
-    {
-        QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
-        return 0;
-    }
     if (v == ProtocolVersion(0, 10)) {
         ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
         c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
@@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputCon
     return c.release();
 }
 
-    
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Aug  3 12:13:32 2012
@@ -36,6 +36,9 @@
 
 using namespace qpid;
 using namespace qpid::broker;
+
+using std::string;
+
 using namespace qpid::framing;
 using qpid::sys::SecurityLayer;
 namespace _qmf = qmf::org::apache::qpid::broker;
@@ -103,9 +106,10 @@ void ConnectionHandler::setSecureConnect
     handler->secured = secured;
 }
 
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow)  : handler(new Handler(connection, isClient, isShadow)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient)  :
+    handler(new Handler(connection, isClient)) {}
 
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
     proxy(c.getOutput()),
     connection(c), serverMode(!isClient), secured(0),
     isOpen(false)
@@ -116,14 +120,13 @@ ConnectionHandler::Handler::Handler(Conn
 
         properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
 
-        authenticator = SaslAuthenticator::createAuthenticator(c, isShadow);
+        authenticator = SaslAuthenticator::createAuthenticator(c);
         authenticator->getMechanisms(mechanisms);
 
         Array locales(0x95);
         boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
         locales.add(l);
         proxy.start(properties, mechanisms, locales);
-        
     }
 
     maxFrameSize = (64 * 1024) - 1;
@@ -149,12 +152,20 @@ void ConnectionHandler::Handler::startOk
         authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
     } catch (std::exception& /*e*/) {
         management::ManagementAgent* agent = connection.getAgent();
-        if (agent) {
+        bool logEnabled;
+        QPID_LOG_TEST_CAT(debug, model, logEnabled);
+        if (logEnabled || agent)
+        {
             string error;
             string uid;
             authenticator->getError(error);
             authenticator->getUid(uid);
-            agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            if (agent) {
+                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            }
+            QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+                << " user:" << uid
+                << " reason:" << error );
         }
         throw;
     }
@@ -169,7 +180,9 @@ void ConnectionHandler::Handler::startOk
         AclModule* acl =  connection.getBroker().getAcl();
         FieldTable properties;
     	if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
-            proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+            proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+                        QPID_MSG("ACL denied " << connection.getUserId()
+                                 << " creating a federation link"));
             return;
         }
         QPID_LOG(info, "Connection is a federation link");
@@ -195,12 +208,20 @@ void ConnectionHandler::Handler::secureO
         authenticator->step(response);
     } catch (std::exception& /*e*/) {
         management::ManagementAgent* agent = connection.getAgent();
-        if (agent) {
+        bool logEnabled;
+        QPID_LOG_TEST_CAT(debug, model, logEnabled);
+        if (logEnabled || agent)
+        {
             string error;
             string uid;
             authenticator->getError(error);
             authenticator->getUid(uid);
-            agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            if (agent) {
+                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            }
+            QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+                << " user:" << uid
+                << " reason:" << error );
         }
         throw;
     }
@@ -278,7 +299,7 @@ void ConnectionHandler::Handler::start(c
                                                   service,
                                                   host,
                                                   0,   // TODO -- mgoulish Fri Sep 24 2010
-                                                  256,  
+                                                  256,
                                                   false ); // disallow interaction
     }
     std::string supportedMechanismsList;
@@ -318,7 +339,7 @@ void ConnectionHandler::Handler::start(c
         connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
     }
 
-    FieldTable ft;
+    FieldTable ft = connection.getBroker().getLinkClientProperties();
     ft.setInt(QPID_FED_LINK,1);
     ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
 
@@ -367,8 +388,14 @@ void ConnectionHandler::Handler::tune(ui
     maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
     connection.setFrameMax(maxFrameSize);
 
-    connection.setHeartbeat(heartbeatMax);
-    proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax);
+    // this method is only ever called when this Connection
+    // is a federation link where this Broker is acting as
+    // a client to another Broker
+    uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax);
+    connection.setHeartbeat(hb);
+    connection.startLinkHeartbeatTimeoutTask();
+
+    proxy.tuneOk(channelMax, maxFrameSize, hb);
     proxy.open("/", Array(), true);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h Fri Aug  3 12:13:32 2012
@@ -61,7 +61,7 @@ class ConnectionHandler : public framing
         SecureConnection* secured;
         bool isOpen;
 
-        Handler(Connection& connection, bool isClient, bool isShadow=false);
+        Handler(Connection& connection, bool isClient);
         ~Handler();
         void startOk(const qpid::framing::ConnectionStartOkBody& body);
         void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +99,7 @@ class ConnectionHandler : public framing
 
     bool handle(const qpid::framing::AMQMethodBody& method);
   public:
-    ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
+    ConnectionHandler(Connection& connection, bool isClient );
     void close(framing::connection::CloseCode code, const std::string& text);
     void heartbeat();
     void handle(framing::AMQFrame& frame);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h Fri Aug  3 12:13:32 2012
@@ -23,9 +23,7 @@
  */
 
 #include "ConnectionObserver.h"
-#include "qpid/sys/Mutex.h"
-#include <set>
-#include <algorithm>
+#include "Observers.h"
 
 namespace qpid {
 namespace broker {
@@ -35,18 +33,10 @@ namespace broker {
  * Calling a ConnectionObserver function will call that function on each observer.
  * THREAD SAFE.
  */
-class ConnectionObservers : public ConnectionObserver {
+class ConnectionObservers : public ConnectionObserver,
+                            public Observers<ConnectionObserver>
+{
   public:
-    void add(boost::shared_ptr<ConnectionObserver> observer) {
-        sys::Mutex::ScopedLock l(lock);
-        observers.insert(observer);
-    }
-
-    void remove(boost::shared_ptr<ConnectionObserver> observer) {
-        sys::Mutex::ScopedLock l(lock);
-        observers.erase(observer);
-    }
-
     void connection(Connection& c) {
         each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
     }
@@ -62,16 +52,6 @@ class ConnectionObservers : public Conne
     void forced(Connection& c, const std::string& text) {
         each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
     }
-
-  private:
-    typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
-    sys::Mutex lock;
-    Observers observers;
-
-    template <class F> void each(F f) {
-        sys::Mutex::ScopedLock l(lock);
-        std::for_each(observers.begin(), observers.end(), f);
-    }
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h Fri Aug  3 12:13:32 2012
@@ -54,7 +54,9 @@ class Consumer
     bool preAcquires() const { return acquires; }
     const std::string& getName() const { return name; }
 
+    /**@return the position of the last message seen by this consumer */
     virtual framing::SequenceNumber getPosition() const  { return position; }
+
     virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
 
     virtual bool deliver(QueuedMessage& msg) = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h Fri Aug  3 12:13:32 2012
@@ -74,7 +74,6 @@ class Daemon : private boost::noncopyabl
 
     pid_t pid;
     int pipeFds[2];
-    int lockFileFd;
     std::string lockFile;
     std::string pidDir;
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Fri Aug  3 12:13:32 2012
@@ -26,6 +26,9 @@
 #include <iostream>
 
 using namespace qpid::broker;
+
+using std::string;
+
 using namespace qpid::framing;
 using namespace qpid::sys;
 using qpid::management::Manageable;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp Fri Aug  3 12:13:32 2012
@@ -35,6 +35,8 @@
 namespace qpid {
 namespace broker {
 
+using std::string;
+
 using namespace qpid::framing;
 using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
@@ -167,7 +169,7 @@ void Exchange::routeIVE(){
 
 
 Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
-    name(_name), durable(false), persistenceId(0), sequence(false),
+    name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
     sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Fri Aug  3 12:13:32 2012
@@ -174,8 +174,9 @@ public:
     bool isDurable() { return durable; }
     qpid::framing::FieldTable& getArgs() { return args; }
 
-    Exchange::shared_ptr getAlternate() { return alternate; }
-    void setAlternate(Exchange::shared_ptr _alternate);
+    QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
+    QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
+
     void incAlternateUsers() { alternateUsers++; }
     void decAlternateUsers() { alternateUsers--; }
     bool inUseAsAlternate() { return alternateUsers > 0; }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Aug  3 12:13:32 2012
@@ -19,6 +19,7 @@
  *
  */
 
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
@@ -42,38 +43,42 @@ pair<Exchange::shared_ptr, bool> Exchang
 
 pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
                                                            bool durable, const FieldTable& args){
-    RWlock::ScopedWlock locker(lock);
-    ExchangeMap::iterator i =  exchanges.find(name);
-    if (i == exchanges.end()) {
-        Exchange::shared_ptr exchange;
-
-        if (type == TopicExchange::typeName){
-            exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
-        }else if(type == DirectExchange::typeName){
-            exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
-        }else if(type == FanOutExchange::typeName){
-            exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
-        }else if (type == HeadersExchange::typeName) {
-            exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
-        }else if (type == ManagementDirectExchange::typeName) {
-            exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
-        }else if (type == ManagementTopicExchange::typeName) {
-            exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
-        }else if (type == Link::exchangeTypeName) {
-            exchange = Link::linkExchangeFactory(name);
-        }else{
-            FunctionMap::iterator i =  factory.find(type);
-            if (i == factory.end()) {
-                throw UnknownExchangeTypeException();
-            } else {
-                exchange = i->second(name, durable, args, parent, broker);
+    Exchange::shared_ptr exchange;
+    std::pair<Exchange::shared_ptr, bool> result;
+    {
+        RWlock::ScopedWlock locker(lock);
+        ExchangeMap::iterator i =  exchanges.find(name);
+        if (i == exchanges.end()) {
+            if (type == TopicExchange::typeName){
+                exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
+            }else if(type == DirectExchange::typeName){
+                exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
+            }else if(type == FanOutExchange::typeName){
+                exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
+            }else if (type == HeadersExchange::typeName) {
+                exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
+            }else if (type == ManagementDirectExchange::typeName) {
+                exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+            }else if (type == ManagementTopicExchange::typeName) {
+                exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+            }else if (type == Link::exchangeTypeName) {
+                exchange = Link::linkExchangeFactory(name);
+            }else{
+                FunctionMap::iterator i =  factory.find(type);
+                if (i == factory.end()) {
+                    throw UnknownExchangeTypeException();
+                } else {
+                    exchange = i->second(name, durable, args, parent, broker);
+                }
             }
+            exchanges[name] = exchange;
+            result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+        } else {
+            result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
         }
-        exchanges[name] = exchange;
-        return std::pair<Exchange::shared_ptr, bool>(exchange, true);
-    } else {
-        return std::pair<Exchange::shared_ptr, bool>(i->second, false);
     }
+    if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
+    return result;
 }
 
 void ExchangeRegistry::destroy(const string& name){
@@ -82,12 +87,17 @@ void ExchangeRegistry::destroy(const str
          (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
         name == "qpid.management")
         throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
-    RWlock::ScopedWlock locker(lock);
-    ExchangeMap::iterator i =  exchanges.find(name);
-    if (i != exchanges.end()) {
-        i->second->destroy();
-        exchanges.erase(i);
+    Exchange::shared_ptr exchange;
+    {
+        RWlock::ScopedWlock locker(lock);
+        ExchangeMap::iterator i =  exchanges.find(name);
+        if (i != exchanges.end()) {
+            exchange = i->second;
+            i->second->destroy();
+            exchanges.erase(i);
+        }
     }
+    if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
 }
 
 Exchange::shared_ptr ExchangeRegistry::find(const string& name){

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Fri Aug  3 12:13:32 2012
@@ -24,6 +24,9 @@
 #include <algorithm>
 
 using namespace qpid::broker;
+
+using std::string;
+
 using namespace qpid::framing;
 using namespace qpid::sys;
 namespace _qmf = qmf::org::apache::qpid::broker;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp Fri Aug  3 12:13:32 2012
@@ -26,6 +26,9 @@
 
 
 using namespace qpid::broker;
+
+using std::string;
+
 using namespace qpid::framing;
 using namespace qpid::sys;
 namespace _qmf = qmf::org::apache::qpid::broker;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp Fri Aug  3 12:13:32 2012
@@ -125,18 +125,20 @@ boost::shared_ptr<Exchange> Link::linkEx
     return Exchange::shared_ptr(new LinkExchange(_name));
 }
 
-Link::Link(LinkRegistry*  _links,
-           MessageStore*  _store,
+Link::Link(const string&  _name,
+           LinkRegistry*  _links,
            const string&        _host,
            uint16_t       _port,
            const string&        _transport,
+           DestroyedListener    l,
            bool           _durable,
            const string&        _authMechanism,
            const string&        _username,
            const string&        _password,
            Broker*        _broker,
-           Manageable*    parent)
-    : links(_links), store(_store),
+           Manageable*    parent,
+           bool failover_)
+    : name(_name), links(_links),
       configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
       host(_host), port(_port), transport(_transport),
       durable(_durable),
@@ -149,7 +151,9 @@ Link::Link(LinkRegistry*  _links,
       channelCounter(1),
       connection(0),
       agent(0),
+      listener(l),
       timerTask(new LinkTimerTask(*this, broker->getTimer())),
+      failover(failover_),
       failoverChannel(0)
 {
     if (parent != 0 && broker != 0)
@@ -157,7 +161,10 @@ Link::Link(LinkRegistry*  _links,
         agent = broker->getManagementAgent();
         if (agent != 0)
         {
-            mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+            mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+            mgmtObject->set_host(host);
+            mgmtObject->set_port(port);
+            mgmtObject->set_transport(transport);
             agent->addObject(mgmtObject, 0, durable);
         }
     }
@@ -169,13 +176,15 @@ Link::Link(LinkRegistry*  _links,
     }
     broker->getTimer().add(timerTask);
 
-    stringstream _name;
-    _name << "qpid.link." << transport << ":" << host << ":" << port;
-    std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
-                                                                              exchangeTypeName);
-    failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
-    assert(failoverExchange);
-    failoverExchange->setLink(this);
+    if (failover) {
+        stringstream exchangeName;
+        exchangeName << "qpid.link." << name;
+        std::pair<Exchange::shared_ptr, bool> rc =
+            broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+        failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+        assert(failoverExchange);
+        failoverExchange->setLink(this);
+    }
 }
 
 Link::~Link ()
@@ -187,7 +196,8 @@ Link::~Link ()
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
 
-    broker->getExchanges().destroy(failoverExchange->getName());
+    if (failover)
+        broker->getExchanges().destroy(failoverExchange->getName());
 }
 
 void Link::setStateLH (int newState)
@@ -239,16 +249,19 @@ void Link::established(Connection* c)
 
     if (!hideManagement() && agent)
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-
-    Mutex::ScopedLock mutex(lock);
-    setStateLH(STATE_OPERATIONAL);
-    currentInterval = 1;
-    visitCount      = 0;
-    connection = c;
-    if (closing)
+    bool isClosing = false;
+    {
+        Mutex::ScopedLock mutex(lock);
+        setStateLH(STATE_OPERATIONAL);
+        currentInterval = 1;
+        visitCount      = 0;
+        connection = c;
+        isClosing = closing;
+    }
+    if (isClosing)
         destroy();
     else // Process any IO tasks bridges added before established.
-        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 
@@ -261,16 +274,26 @@ void Link::setUrl(const Url& u) {
 
 
 namespace {
-    /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
-    void sessionDetached(Link *link) {
-        QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
-    }
+class DetachedCallback : public SessionHandler::ErrorListener {
+  public:
+    DetachedCallback(const Link& link) : name(link.getName()) {}
+    void connectionException(framing::connection::CloseCode, const std::string&) {}
+    void channelException(framing::session::DetachCode, const std::string&) {}
+    void executionException(framing::execution::ErrorCode, const std::string&) {}
+    void detach() {}
+  private:
+    const std::string name;
+};
 }
 
-
 void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     if (!connection) return;
+
+    if (!hideManagement() && connection->GetManagementObject()) {
+        mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+    }
+
     // Get default URL from known-hosts if not already set
     if (url.empty()) {
         const std::vector<Url>& known = connection->getKnownHosts();
@@ -282,80 +305,82 @@ void Link::opened() {
         QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
     }
 
-    //
-    // attempt to subscribe to failover exchange for updates from remote
-    //
-
-    const std::string queueName = "qpid.link." + framing::Uuid(true).str();
-    failoverChannel = nextChannel();
-
-    SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
-    sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
-    failoverSession = queueName;
-    sessionHandler.attachAs(failoverSession);
-
-    framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-
-    remoteBroker.getQueue().declare(queueName,
-                                    "",         // alt-exchange
-                                    false,      // passive
-                                    false,      // durable
-                                    true,       // exclusive
-                                    true,       // auto-delete
-                                    FieldTable());
-    remoteBroker.getExchange().bind(queueName,
-                                    FAILOVER_EXCHANGE,
-                                    "",     // no key
-                                    FieldTable());
-    remoteBroker.getMessage().subscribe(queueName,
-                                        failoverExchange->getName(),
-                                        1,           // implied-accept mode
-                                        0,           // pre-acquire mode
-                                        false,       // exclusive
-                                        "",          // resume-id
-                                        0,           // resume-ttl
+    if (failover) {
+        //
+        // attempt to subscribe to failover exchange for updates from remote
+        //
+
+        const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+        failoverChannel = nextChannel();
+
+        SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+        sessionHandler.setErrorListener(
+            boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+        failoverSession = queueName;
+        sessionHandler.attachAs(failoverSession);
+
+        framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+        remoteBroker.getQueue().declare(queueName,
+                                        "",         // alt-exchange
+                                        false,      // passive
+                                        false,      // durable
+                                        true,       // exclusive
+                                        true,       // auto-delete
                                         FieldTable());
-    remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
-    remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+        remoteBroker.getExchange().bind(queueName,
+                                        FAILOVER_EXCHANGE,
+                                        "",     // no key
+                                        FieldTable());
+        remoteBroker.getMessage().subscribe(queueName,
+                                            failoverExchange->getName(),
+                                            1,           // implied-accept mode
+                                            0,           // pre-acquire mode
+                                            false,       // exclusive
+                                            "",          // resume-id
+                                            0,           // resume-ttl
+                                            FieldTable());
+        remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+        remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+    }
 }
 
 void Link::closed(int, std::string text)
 {
-    bool isClosing = false;
-    {
-        Mutex::ScopedLock mutex(lock);
-        QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
+    Mutex::ScopedLock mutex(lock);
+    QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
 
-        connection = 0;
-        if (state == STATE_OPERATIONAL) {
+    connection = 0;
+
+    if (!hideManagement()) {
+        mgmtObject->set_connectionRef(qpid::management::ObjectId());
+        if (state == STATE_OPERATIONAL && agent) {
             stringstream addr;
             addr << host << ":" << port;
-            if (!hideManagement() && agent)
-                agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+            agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
         }
+    }
 
-        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-            (*i)->closed();
-            created.push_back(*i);
-        }
-        active.clear();
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        (*i)->closed();
+        created.push_back(*i);
+    }
+    active.clear();
 
-        if (state != STATE_FAILED && state != STATE_PASSIVE)
-        {
-            setStateLH(STATE_WAITING);
-            if (!hideManagement())
-                mgmtObject->set_lastError (text);
-        }
+    if (state != STATE_FAILED && state != STATE_PASSIVE)
+    {
+        setStateLH(STATE_WAITING);
+        if (!hideManagement())
+            mgmtObject->set_lastError (text);
     }
-    // Call destroy outside of the lock, don't want to be deleted with lock held.
-    if (isClosing)
-        destroy();
 }
 
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
 void Link::destroy ()
 {
     Bridges toDelete;
+
+    timerTask->cancel();    // call prior to locking so maintenance visit can finish
     {
         Mutex::ScopedLock mutex(lock);
 
@@ -374,14 +399,13 @@ void Link::destroy ()
         for (Bridges::iterator i = created.begin(); i != created.end(); i++)
             toDelete.push_back(*i);
         created.clear();
-
-        timerTask->cancel();
     }
+
     // Now delete all bridges on this link (don't hold the lock for this).
     for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
-        (*i)->destroy();
+        (*i)->close();
     toDelete.clear();
-    links->destroy (configuredHost, configuredPort);
+    listener(this); // notify LinkRegistry that this Link has been destroyed
 }
 
 void Link::add(Bridge::shared_ptr bridge)
@@ -423,7 +447,7 @@ void Link::ioThreadProcessing()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (state != STATE_OPERATIONAL)
+    if (state != STATE_OPERATIONAL || closing)
         return;
 
     // check for bridge session errors and recover
@@ -460,7 +484,7 @@ void Link::ioThreadProcessing()
 void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
-
+    if (closing) return;
     if (state == STATE_WAITING)
     {
         visitCount++;
@@ -476,21 +500,27 @@ void Link::maintenanceVisit ()
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
+    else if (state == STATE_OPERATIONAL &&
+             (!active.empty() || !created.empty() || !cancellations.empty()) &&
+             connection && connection->isOpen())
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-    }
+}
 
 void Link::reconnectLH(const Address& a)
 {
     host = a.host;
     port = a.port;
     transport = a.protocol;
-    startConnectionLH();
+
     if (!hideManagement()) {
         stringstream errorString;
-        errorString << "Failed over to " << a;
+        errorString << "Failing over to " << a;
         mgmtObject->set_lastError(errorString.str());
+        mgmtObject->set_host(host);
+        mgmtObject->set_port(port);
+        mgmtObject->set_transport(transport);
     }
+    startConnectionLH();
 }
 
 bool Link::tryFailoverLH() {
@@ -499,15 +529,14 @@ bool Link::tryFailoverLH() {
     if (url.empty()) return false;
     Address next = url[reconnectNext++];
     if (next.host != host || next.port != port || next.protocol != transport) {
-        links->changeAddress(Address(transport, host, port), next);
-        QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+        QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
         reconnectLH(next);
         return true;
     }
     return false;
 }
 
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
 // suppressed.
 bool Link::hideManagement() const {
     return !mgmtObject || ( broker && broker->isInCluster());
@@ -536,18 +565,34 @@ void Link::setPersistenceId(uint64_t id)
 
 const string& Link::getName() const
 {
-    return configuredHost;
+    return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+    return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
 }
 
 Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
 {
+    string kind;
+    buffer.getShortString(kind);
+
     string   host;
     uint16_t port;
     string   transport;
     string   authMechanism;
     string   username;
     string   password;
+    string   name;
 
+    if (kind == ENCODED_IDENTIFIER) {
+        // newer version provides a link name.
+        buffer.getShortString(name);
+    }
     buffer.getShortString(host);
     port = buffer.getShort();
     buffer.getShortString(transport);
@@ -556,12 +601,21 @@ Link::shared_ptr Link::decode(LinkRegist
     buffer.getShortString(username);
     buffer.getShortString(password);
 
-    return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+    if (kind == ENCODED_IDENTIFIER_V1) {
+        /** previous versions identified the Link by host:port, there was no name
+         * assigned.  So create a name for the new Link.
+         */
+        name = createName(transport, host, port);
+    }
+
+    return links.declare(name, host, port, transport, durable, authMechanism,
+                         username, password).first;
 }
 
 void Link::encode(Buffer& buffer) const
 {
-    buffer.putShortString(string("link"));
+    buffer.putShortString(ENCODED_IDENTIFIER);
+    buffer.putShortString(name);
     buffer.putShortString(configuredHost);
     buffer.putShort(configuredPort);
     buffer.putShortString(configuredTransport);
@@ -573,8 +627,9 @@ void Link::encode(Buffer& buffer) const
 
 uint32_t Link::encodedSize() const
 {
-    return configuredHost.size() + 1 // short-string (host)
-        + 5                // short-string ("link")
+    return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+        + name.size() + 1
+        + configuredHost.size() + 1 // short-string (host)
         + 2                // port
         + configuredTransport.size() + 1 // short-string(transport)
         + 1                // durable
@@ -589,6 +644,7 @@ ManagementObject* Link::GetManagementObj
 }
 
 void Link::close() {
+    QPID_LOG(debug, "Link::close(), link=" << name );
     Mutex::ScopedLock mutex(lock);
     if (!closing) {
         closing = true;
@@ -609,36 +665,31 @@ Manageable::status_t Link::ManagementMet
         return Manageable::STATUS_OK;
 
     case _qmf::Link::METHOD_BRIDGE :
+        /* TBD: deprecate this interface in favor of the Broker::create() method.  The
+         * Broker::create() method allows the user to assign a name to the bridge.
+         */
+        QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
+                 " Please use the Broker::create() method with type='bridge' instead.");
         _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
-        QPID_LOG(debug, "Link::bridge() request received");
+        QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+                 "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
 
-        // Durable bridges are only valid on durable links
-        if (iargs.i_durable && !durable) {
-            text = "Can't create a durable route on a non-durable link";
-            return Manageable::STATUS_USER;
-        }
-
-        if (iargs.i_dynamic) {
-            Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
-            if (exchange.get() == 0) {
-                text = "Exchange not found";
-                return Manageable::STATUS_USER;
-            }
-            if (!exchange->supportsDynamicBinding()) {
-                text = "Exchange type does not support dynamic routing";
-                return Manageable::STATUS_USER;
+        // Does a bridge already exist that has the src/dest/key?  If so, re-use the
+        // existing bridge - this behavior is backward compatible with previous releases.
+        Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+        if (!bridge) {
+            // need to create a new bridge on this link.
+            std::pair<Bridge::shared_ptr, bool> rc =
+              links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+                              *this, iargs.i_durable,
+                              iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+                              iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+                              iargs.i_dynamic, iargs.i_sync);
+            if (!rc.first) {
+                text = "invalid parameters";
+                return Manageable::STATUS_PARAMETER_INVALID;
             }
         }
-
-        std::pair<Bridge::shared_ptr, bool> result =
-            links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
-                            iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
-                            iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
-                            iargs.i_dynamic, iargs.i_sync);
-
-        if (result.second && iargs.i_durable)
-            store->create(*result.first);
-
         return Manageable::STATUS_OK;
     }
 
@@ -666,11 +717,13 @@ void Link::closeConnection( const std::s
 {
     if (connection != 0) {
         // cancel our subscription to the failover exchange
-        SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
-        if (sessionHandler.getSession()) {
-            framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-            remoteBroker.getMessage().cancel(failoverExchange->getName());
-            remoteBroker.getSession().detach(failoverSession);
+        if (failover) {
+            SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+            if (sessionHandler.getSession()) {
+                framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+                remoteBroker.getMessage().cancel(failoverExchange->getName());
+                remoteBroker.getSession().detach(failoverSession);
+            }
         }
         connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
         connection = 0;
@@ -716,6 +769,23 @@ void Link::setState(const framing::Field
     }
 }
 
+std::string Link::createName(const std::string& transport,
+                             const std::string& host,
+                             uint16_t  port)
+{
+    stringstream linkName;
+    linkName << QPID_NAME_PREFIX << transport << std::string(":")
+             << host << std::string(":") << port;
+    return linkName.str();
+}
+
+
+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
+{
+    Mutex::ScopedLock mutex(lock);
+    return (isConnecting() && _port == port && _host == host);
+}
+
 
 const std::string Link::exchangeTypeName("qpid.LinkExchange");
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h Fri Aug  3 12:13:32 2012
@@ -25,7 +25,6 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/Url.h"
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
 #include "qpid/broker/PersistableConfig.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/BrokerImportExport.h"
@@ -52,8 +51,8 @@ class LinkExchange;
 class Link : public PersistableConfig, public management::Manageable {
   private:
     mutable sys::Mutex  lock;
+    const std::string   name;
     LinkRegistry*       links;
-    MessageStore*       store;
 
     // these remain constant across failover - used to identify this link
     const std::string   configuredTransport;
@@ -64,7 +63,8 @@ class Link : public PersistableConfig, p
     uint16_t            port;
     std::string         transport;
 
-    bool          durable;
+    bool durable;
+
     std::string        authMechanism;
     std::string        username;
     std::string        password;
@@ -85,8 +85,10 @@ class Link : public PersistableConfig, p
     uint channelCounter;
     Connection* connection;
     management::ManagementAgent* agent;
+    boost::function<void(Link*)> listener;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
     boost::shared_ptr<broker::LinkExchange> failoverExchange;  // subscribed to remote's amq.failover exchange
+    bool failover; // Do we subscribe to a failover exchange?
     uint failoverChannel;
     std::string failoverSession;
 
@@ -101,33 +103,39 @@ class Link : public PersistableConfig, p
 
     void setStateLH (int newState);
     void startConnectionLH();        // Start the IO Connection
-    void destroy();                  // Called when mgmt deletes this link
+    void destroy();                  // Cleanup connection before link goes away
     void ioThreadProcessing();       // Called on connection's IO thread by request
     bool tryFailoverLH();            // Called during maintenance visit
     bool hideManagement() const;
+    void reconnectLH(const Address&); //called by LinkRegistry
 
-    void established(Connection*); // Called when connection is create
+    // connection management (called by LinkRegistry)
+    void established(Connection*); // Called when connection is created
     void opened();      // Called when connection is open (after create)
     void closed(int, std::string);   // Called when connection goes away
-    void reconnectLH(const Address&); //called by LinkRegistry
+    void notifyConnectionForced(const std::string text);
     void closeConnection(const std::string& reason);
+    bool pendingConnection(const std::string& host, uint16_t port) const;  // is Link trying to connect to this remote?
 
     friend class LinkRegistry; // to call established, opened, closed
 
   public:
     typedef boost::shared_ptr<Link> shared_ptr;
+    typedef boost::function<void(Link*)> DestroyedListener;
 
-    Link(LinkRegistry* links,
-         MessageStore* store,
+    Link(const std::string&       name,
+         LinkRegistry* links,
          const std::string&       host,
          uint16_t      port,
          const std::string&       transport,
+         DestroyedListener        l,
          bool          durable,
          const std::string&       authMechanism,
          const std::string&       username,
          const std::string&       password,
          Broker*       broker,
-         management::Manageable* parent = 0);
+         management::Manageable* parent = 0,
+         bool failover=true);
     virtual ~Link();
 
     /** these return the *configured* transport/host/port, which does not change over the
@@ -139,7 +147,7 @@ class Link : public PersistableConfig, p
     /** returns the current address of the remote, which may be different from the
         configured transport/host/port due to failover. Returns true if connection is
         active */
-    bool getRemoteAddress(qpid::Address& addr) const;
+    QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
 
     bool isDurable() { return durable; }
     void maintenanceVisit ();
@@ -148,15 +156,17 @@ class Link : public PersistableConfig, p
     void cancel(Bridge::shared_ptr);
 
     QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
-    QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+    // Close the link.
+    QPID_BROKER_EXTERN void close();
 
     std::string getAuthMechanism() { return authMechanism; }
     std::string getUsername()      { return username; }
     std::string getPassword()      { return password; }
     Broker* getBroker()       { return broker; }
 
-    void notifyConnectionForced(const std::string text);
     void setPassive(bool p);
+    bool isConnecting() const { return state == STATE_CONNECTING; }
 
     // PersistableConfig:
     void     setPersistenceId(uint64_t id) const;
@@ -165,7 +175,10 @@ class Link : public PersistableConfig, p
     void     encode(framing::Buffer& buffer) const;
     const std::string& getName() const;
 
+    static const std::string ENCODED_IDENTIFIER;
+    static const std::string ENCODED_IDENTIFIER_V1;
     static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+    static bool isEncodedLink(const std::string& key);
 
     // Manageable entry points
     management::ManagementObject*    GetManagementObject(void) const;
@@ -178,6 +191,16 @@ class Link : public PersistableConfig, p
     // replicate internal state of this Link for clustering
     void getState(framing::FieldTable& state) const;
     void setState(const framing::FieldTable& state);
+
+    /** create a name for a link (if none supplied by user config) */
+    static std::string createName(const std::string& transport,
+                                  const std::string& host,
+                                  uint16_t  port);
+
+    /** The current connction for this link. Note returns 0 if the link is not
+     * presently connected.
+     */
+    Connection* getConnection() { return connection; }
 };
 }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org