You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [4/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Fri Aug 3 12:13:32 2012
@@ -108,7 +108,6 @@ Broker::Options::Options(const std::stri
noDataDir(0),
port(DEFAULT_PORT),
workerThreads(5),
- maxConnections(500),
connectionBacklog(10),
enableMgmt(1),
mgmtPublish(1),
@@ -128,8 +127,10 @@ Broker::Options::Options(const std::stri
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
- timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
- linkMaintenanceInterval(2)
+ timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
+ linkMaintenanceInterval(2),
+ linkHeartbeatInterval(120),
+ maxNegotiateTime(2000) // 2s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -146,7 +147,6 @@ Broker::Options::Options(const std::stri
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
- ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
@@ -171,6 +171,9 @@ Broker::Options::Options(const std::stri
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
+ ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
+ ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+ ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
}
@@ -208,7 +211,6 @@ Broker::Broker(const Broker::Options& co
inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
- connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
{
@@ -227,7 +229,6 @@ Broker::Broker(const Broker::Options& co
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
- mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
mgmtObject->set_mgmtPublish(conf.mgmtPublish);
@@ -244,8 +245,11 @@ Broker::Broker(const Broker::Options& co
// management schema correct.
Vhost* vhost = new Vhost(this, this);
vhostObject = Vhost::shared_ptr(vhost);
- framing::Uuid uuid(managementAgent->getUuid());
- federationTag = uuid.str();
+ if (conf.fedTag.empty()) {
+ framing::Uuid uuid(managementAgent->getUuid());
+ federationTag = uuid.str();
+ } else
+ federationTag = conf.fedTag;
vhostObject->setFederationTag(federationTag);
queues.setParent(vhost);
@@ -254,8 +258,11 @@ Broker::Broker(const Broker::Options& co
} else {
// Management is disabled so there is no broker management ID.
// Create a unique uuid to use as the federation tag.
- framing::Uuid uuid(true);
- federationTag = uuid.str();
+ if (conf.fedTag.empty()) {
+ framing::Uuid uuid(true);
+ federationTag = uuid.str();
+ } else
+ federationTag = conf.fedTag;
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
@@ -346,7 +353,7 @@ Broker::Broker(const Broker::Options& co
knownBrokers.push_back(Url(conf.knownHosts));
}
- } catch (const std::exception& /*e*/) {
+ } catch (const std::exception&) {
finalize();
throw;
}
@@ -443,7 +450,7 @@ Manageable* Broker::GetVhostObject(void)
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& args,
- string&)
+ string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -458,6 +465,14 @@ Manageable::status_t Broker::ManagementM
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
+ /** Management is creating a Link to a remote broker using the host and port of
+ * the remote. This (old) interface does not allow management to specify a name
+ * for the link, nor does it allow multiple Links to the same remote. Use the
+ * "create()" broker method if these features are needed.
+ * TBD: deprecate this interface.
+ */
+ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -466,13 +481,24 @@ Manageable::status_t Broker::ManagementM
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
+ text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
}
- std::pair<Link::shared_ptr, bool> response =
- links.declare (hp.i_host, hp.i_port, transport, hp.i_durable,
- hp.i_authMechanism, hp.i_username, hp.i_password);
- if (hp.i_durable && response.second)
- store->create(*response.first);
+
+ // Does a link to the remote already exist? If so, re-use the existing link
+ // - this behavior is backward compatible with previous releases.
+ if (!links.getLink(hp.i_host, hp.i_port, transport)) {
+ // new link, need to generate a unique name for it
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+ hp.i_host, hp.i_port, transport,
+ hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
+ if (!response.first) {
+ text = "Unable to create Link";
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ break;
+ }
+ }
status = Manageable::STATUS_OK;
break;
}
@@ -543,6 +569,8 @@ const std::string TYPE_QUEUE("queue");
const std::string TYPE_EXCHANGE("exchange");
const std::string TYPE_TOPIC("topic");
const std::string TYPE_BINDING("binding");
+const std::string TYPE_LINK("link");
+const std::string TYPE_BRIDGE("bridge");
const std::string DURABLE("durable");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
@@ -554,6 +582,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_
const std::string _TRUE("true");
const std::string _FALSE("false");
+
+// parameters for creating a Link object, see mgmt schema
+const std::string HOST("host");
+const std::string PORT("port");
+const std::string TRANSPORT("transport");
+const std::string AUTH_MECHANISM("authMechanism");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+
+// parameters for creating a Bridge object, see mgmt schema
+const std::string LINK("link");
+const std::string SRC("src");
+const std::string DEST("dest");
+const std::string KEY("key");
+const std::string TAG("tag");
+const std::string EXCLUDES("excludes");
+const std::string SRC_IS_QUEUE("srcIsQueue");
+const std::string SRC_IS_LOCAL("srcIsLocal");
+const std::string DYNAMIC("dynamic");
+const std::string SYNC("sync");
}
struct InvalidBindingIdentifier : public qpid::Exception
@@ -603,6 +651,25 @@ struct UnknownObjectType : public qpid::
std::string getPrefix() const { return "unknown object type"; }
};
+struct ReservedObjectName : public qpid::Exception
+{
+ ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return std::string("names prefixed with '")
+ + QPID_NAME_PREFIX + std::string("' are reserved"); }
+};
+
+struct UnsupportedTransport : public qpid::Exception
+{
+ UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "transport is not supported"; }
+};
+
+struct InvalidParameter : public qpid::Exception
+{
+ InvalidParameter(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "invalid parameter to method call"; }
+};
+
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
{
@@ -674,6 +741,113 @@ void Broker::createObject(const std::str
amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+
+ } else if (type == TYPE_LINK) {
+
+ QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string host;
+ uint16_t port = 0;
+ std::string transport = TCP_TRANSPORT;
+ bool durable = false;
+ std::string authMech, username, password;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == HOST) host = i->second.asString();
+ else if (i->first == PORT) port = i->second.asUint16();
+ else if (i->first == TRANSPORT) transport = i->second.asString();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
+ else if (i->first == USERNAME) username = i->second.asString();
+ else if (i->first == PASSWORD) password = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ if (!getProtocolFactory(transport)) {
+ QPID_LOG(error, "Transport '" << transport << "' not supported.");
+ throw UnsupportedTransport(transport);
+ }
+
+ std::pair<boost::shared_ptr<Link>, bool> rc;
+ rc = links.declare(name, host, port, transport, durable, authMech, username, password);
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
+ "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
+
+ } else if (type == TYPE_BRIDGE) {
+
+ QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string linkName;
+ std::string src;
+ std::string dest;
+ std::string key;
+ std::string id;
+ std::string excludes;
+ std::string queueName;
+ bool durable = false;
+ bool srcIsQueue = false;
+ bool srcIsLocal = false;
+ bool dynamic = false;
+ uint16_t sync = 0;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+
+ if (i->first == LINK) linkName = i->second.asString();
+ else if (i->first == SRC) src = i->second.asString();
+ else if (i->first == DEST) dest = i->second.asString();
+ else if (i->first == KEY) key = i->second.asString();
+ else if (i->first == TAG) id = i->second.asString();
+ else if (i->first == EXCLUDES) excludes = i->second.asString();
+ else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
+ else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
+ else if (i->first == DYNAMIC) dynamic = bool(i->second);
+ else if (i->first == SYNC) sync = i->second.asUint16();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == QUEUE_NAME) queueName = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ boost::shared_ptr<Link> link;
+ if (linkName.empty() || !(link = links.getLink(linkName))) {
+ QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
+ throw InvalidParameter(name);
+ }
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
+ dynamic, sync,
+ 0,
+ queueName);
+
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
+ "; src=" << src << "; dest=" << dest << "; key=" << key);
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
} else {
throw UnknownObjectType(type);
}
@@ -696,6 +870,16 @@ void Broker::deleteObject(const std::str
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else if (type == TYPE_LINK) {
+ boost::shared_ptr<Link> link = links.getLink(name);
+ if (link) {
+ link->close();
+ }
+ } else if (type == TYPE_BRIDGE) {
+ boost::shared_ptr<Bridge> bridge = links.getBridge(name);
+ if (bridge) {
+ bridge->close();
+ }
} else {
throw UnknownObjectType(type);
}
@@ -920,6 +1104,13 @@ std::pair<boost::shared_ptr<Queue>, bool
ManagementAgent::toMap(arguments),
"created"));
}
+ QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ << " durable:" << (durable ? "T" : "F")
+ << " owner:" << owner
+ << " autodelete:" << (autodelete ? "T" : "F")
+ << " alternateExchange:" << alternateExchange );
}
return result;
}
@@ -942,6 +1133,10 @@ void Broker::deleteQueue(const std::stri
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+ QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
}
@@ -993,6 +1188,12 @@ std::pair<Exchange::shared_ptr, bool> Br
ManagementAgent::toMap(arguments),
"created"));
}
+ QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ << " type:" << type
+ << " alternateExchange:" << alternateExchange
+ << " durable:" << (durable ? "T" : "F"));
}
return result;
}
@@ -1017,7 +1218,9 @@ void Broker::deleteExchange(const std::s
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+ QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
void Broker::bind(const std::string& queueName,
@@ -1047,10 +1250,16 @@ void Broker::bind(const std::string& que
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
} else {
if (queue->bind(exchange, key, arguments)) {
+ getConfigurationObservers().bind(exchange, queue, key, arguments);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
queueName, key, ManagementAgent::toMap(arguments)));
}
+ QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
+ << " queue:" << queueName
+ << " key:" << key
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
}
}
@@ -1082,12 +1291,33 @@ void Broker::unbind(const std::string& q
if (exchange->isDurable() && queue->isDurable()) {
store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
}
+ getConfigurationObservers().unbind(
+ exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
+ QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName
+ << " queue:" << queueName
+ << " key:" << key
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
}
}
+// FIXME aconway 2012-04-27: access to linkClientProperties is
+// not properly thread safe, you could lose fields if 2 threads
+// attempt to add a field concurrently.
+
+framing::FieldTable Broker::getLinkClientProperties() const {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ return linkClientProperties;
+}
+
+void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ linkClientProperties = ft;
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Fri Aug 3 12:13:32 2012
@@ -40,6 +40,7 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/ConfigurationObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -64,8 +65,8 @@
namespace qpid {
namespace sys {
- class ProtocolFactory;
- class Poller;
+class ProtocolFactory;
+class Poller;
}
struct Url;
@@ -91,7 +92,7 @@ class Broker : public sys::Runnable, pub
public management::Manageable,
public RefCounted
{
-public:
+ public:
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
@@ -103,7 +104,6 @@ public:
std::string dataDir;
uint16_t port;
int workerThreads;
- int maxConnections;
int connectionBacklog;
bool enableMgmt;
bool mgmtPublish;
@@ -127,31 +127,14 @@ public:
std::string defaultMsgGroup;
bool timestampRcvMsgs;
double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
+ uint16_t linkHeartbeatInterval;
+ uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation
+ std::string fedTag;
private:
std::string getHome();
};
- class ConnectionCounter {
- int maxConnections;
- int connectionCount;
- sys::Mutex connectionCountLock;
- public:
- ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount--;
- }
- bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- return (maxConnections <= connectionCount);
- }
- };
-
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -183,6 +166,7 @@ public:
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
+ ConfigurationObservers configurationObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -203,9 +187,11 @@ public:
bool recovery;
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
+ mutable sys::Mutex linkClientPropertiesLock;
+ framing::FieldTable linkClientProperties;
+
public:
QPID_BROKER_EXTERN virtual ~Broker();
@@ -317,8 +303,6 @@ public:
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- ConnectionCounter& getConnectionCounter() {return connectionCounter;}
-
/**
* Never true in a stand-alone broker. In a cluster, return true
* to defer delivery of messages deliveredg in a cluster-unsafe
@@ -377,6 +361,14 @@ public:
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ ConfigurationObservers& getConfigurationObservers() { return configurationObservers; }
+
+ /** Properties to be set on outgoing link connections */
+ QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
+ QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+
+ /** Information identifying this system */
+ boost::shared_ptr<const System> getSystem() const { return systemObject; }
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp Fri Aug 3 12:13:32 2012
@@ -43,7 +43,7 @@
#include <iostream>
#include <assert.h>
-
+using std::string;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputH
bool link_,
uint64_t objectId_,
bool shadow_,
- bool delayManagement) :
+ bool delayManagement,
+ bool authenticated_
+) :
ConnectionState(out_, broker_),
securitySettings(external),
- adapter(*this, link_, shadow_),
+ shadow(shadow_),
+ authenticated(authenticated_),
+ adapter(*this, link_),
link(link_),
mgmtClosing(false),
mgmtId(mgmtId_),
@@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputH
timer(broker_.getTimer()),
errorListener(0),
objectId(objectId_),
- shadow(shadow_),
outboundTracker(*this)
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::addManagementObject() {
@@ -141,6 +143,8 @@ Connection::~Connection()
// a cluster-unsafe context. Don't raise an event in that case.
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+ QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+ << " rhost:" << mgmtId );
}
broker.getConnectionObservers().closed(*this);
@@ -148,8 +152,9 @@ Connection::~Connection()
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
-
- if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
}
void Connection::received(framing::AMQFrame& frame) {
@@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() {
mgmtObject->set_authIdentity(userId);
agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
}
+
+ QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
+ << " rhost:" << mgmtId );
+
}
void Connection::setUserProxyAuth(bool b)
@@ -300,6 +309,9 @@ void Connection::close(connection::Close
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -313,6 +325,9 @@ void Connection::sendClose() {
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -326,6 +341,9 @@ void Connection::closed(){ // Physically
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public
}
};
+class LinkHeartbeatTask : public qpid::sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+ bool heartbeatSeen;
+
+ void fire() {
+ if (!heartbeatSeen) {
+ QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection");
+ connection.abort();
+ } else {
+ heartbeatSeen = false;
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+ }
+ }
+
+public:
+ LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) :
+ TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {}
+
+ void heartbeatReceived() { heartbeatSeen = true; }
+};
+
+
void Connection::abort()
{
// Make sure that we don't try to send a heartbeat as we're
@@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(ui
}
}
+void Connection::startLinkHeartbeatTimeoutTask() {
+ if (!linkHeartbeatTimer && heartbeat > 0) {
+ linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
+ timer.add(linkHeartbeatTimer);
+ }
+}
+
void Connection::restartTimeout()
{
if (timeoutTimer)
timeoutTimer->touch();
+
+ if (linkHeartbeatTimer) {
+ static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived();
+ }
}
bool Connection::isOpen() { return adapter.isOpen(); }
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h Fri Aug 3 12:13:32 2012
@@ -27,8 +27,7 @@
#include <vector>
#include <queue>
-#include <boost/ptr_container/ptr_map.hpp>
-
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionHandler.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/SessionHandler.h"
@@ -86,15 +85,22 @@ class Connection : public sys::Connectio
bool isLink = false,
uint64_t objectId = 0,
bool shadow=false,
- bool delayManagement = false);
+ bool delayManagement = false,
+ bool authenticated=true);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
SessionHandler& getChannel(framing::ChannelId channel);
- /** Close the connection */
- void close(framing::connection::CloseCode code, const std::string& text);
+ /** Close the connection. Waits for the client to respond with close-ok
+ * before actually destroying the connection.
+ */
+ QPID_BROKER_EXTERN void close(
+ framing::connection::CloseCode code, const std::string& text);
+
+ /** Abort the connection. Close abruptly and immediately. */
+ QPID_BROKER_EXTERN void abort();
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -138,8 +144,7 @@ class Connection : public sys::Connectio
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
void restartTimeout();
- void abort();
-
+
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
f(*ptr_map_ptr(i));
@@ -149,7 +154,10 @@ class Connection : public sys::Connectio
void setSecureConnection(SecureConnection* secured);
/** True if this is a shadow connection in a cluster. */
- bool isShadow() { return shadow; }
+ bool isShadow() const { return shadow; }
+
+ /** True if this connection is authenticated */
+ bool isAuthenticated() const { return authenticated; }
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
@@ -166,6 +174,7 @@ class Connection : public sys::Connectio
bool isOpen();
bool isLink() { return link; }
+ void startLinkHeartbeatTimeoutTask();
// Used by cluster during catch-up, see cluster::OutputInterceptor
void doIoCallbacks();
@@ -179,6 +188,8 @@ class Connection : public sys::Connectio
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
+ bool shadow;
+ bool authenticated;
ConnectionHandler adapter;
const bool link;
bool mgmtClosing;
@@ -189,11 +200,10 @@ class Connection : public sys::Connectio
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
uint64_t objectId;
- bool shadow;
framing::FieldTable clientProperties;
/**
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionFactory.cpp Fri Aug 3 12:13:32 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory()
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
@@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputCon
return c.release();
}
-
+
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Aug 3 12:13:32 2012
@@ -36,6 +36,9 @@
using namespace qpid;
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -103,9 +106,10 @@ void ConnectionHandler::setSecureConnect
handler->secured = secured;
}
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow) : handler(new Handler(connection, isClient, isShadow)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) :
+ handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
proxy(c.getOutput()),
connection(c), serverMode(!isClient), secured(0),
isOpen(false)
@@ -116,14 +120,13 @@ ConnectionHandler::Handler::Handler(Conn
properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
- authenticator = SaslAuthenticator::createAuthenticator(c, isShadow);
+ authenticator = SaslAuthenticator::createAuthenticator(c);
authenticator->getMechanisms(mechanisms);
Array locales(0x95);
boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
locales.add(l);
proxy.start(properties, mechanisms, locales);
-
}
maxFrameSize = (64 * 1024) - 1;
@@ -149,12 +152,20 @@ void ConnectionHandler::Handler::startOk
authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
- if (agent) {
+ bool logEnabled;
+ QPID_LOG_TEST_CAT(debug, model, logEnabled);
+ if (logEnabled || agent)
+ {
string error;
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ }
+ QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+ << " user:" << uid
+ << " reason:" << error );
}
throw;
}
@@ -169,7 +180,9 @@ void ConnectionHandler::Handler::startOk
AclModule* acl = connection.getBroker().getAcl();
FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("ACL denied " << connection.getUserId()
+ << " creating a federation link"));
return;
}
QPID_LOG(info, "Connection is a federation link");
@@ -195,12 +208,20 @@ void ConnectionHandler::Handler::secureO
authenticator->step(response);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
- if (agent) {
+ bool logEnabled;
+ QPID_LOG_TEST_CAT(debug, model, logEnabled);
+ if (logEnabled || agent)
+ {
string error;
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ }
+ QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+ << " user:" << uid
+ << " reason:" << error );
}
throw;
}
@@ -278,7 +299,7 @@ void ConnectionHandler::Handler::start(c
service,
host,
0, // TODO -- mgoulish Fri Sep 24 2010
- 256,
+ 256,
false ); // disallow interaction
}
std::string supportedMechanismsList;
@@ -318,7 +339,7 @@ void ConnectionHandler::Handler::start(c
connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
}
- FieldTable ft;
+ FieldTable ft = connection.getBroker().getLinkClientProperties();
ft.setInt(QPID_FED_LINK,1);
ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
@@ -367,8 +388,14 @@ void ConnectionHandler::Handler::tune(ui
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
connection.setFrameMax(maxFrameSize);
- connection.setHeartbeat(heartbeatMax);
- proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax);
+ // this method is only ever called when this Connection
+ // is a federation link where this Broker is acting as
+ // a client to another Broker
+ uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax);
+ connection.setHeartbeat(hb);
+ connection.startLinkHeartbeatTimeoutTask();
+
+ proxy.tuneOk(channelMax, maxFrameSize, hb);
proxy.open("/", Array(), true);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.h Fri Aug 3 12:13:32 2012
@@ -61,7 +61,7 @@ class ConnectionHandler : public framing
SecureConnection* secured;
bool isOpen;
- Handler(Connection& connection, bool isClient, bool isShadow=false);
+ Handler(Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +99,7 @@ class ConnectionHandler : public framing
bool handle(const qpid::framing::AMQMethodBody& method);
public:
- ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
+ ConnectionHandler(Connection& connection, bool isClient );
void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionObservers.h Fri Aug 3 12:13:32 2012
@@ -23,9 +23,7 @@
*/
#include "ConnectionObserver.h"
-#include "qpid/sys/Mutex.h"
-#include <set>
-#include <algorithm>
+#include "Observers.h"
namespace qpid {
namespace broker {
@@ -35,18 +33,10 @@ namespace broker {
* Calling a ConnectionObserver function will call that function on each observer.
* THREAD SAFE.
*/
-class ConnectionObservers : public ConnectionObserver {
+class ConnectionObservers : public ConnectionObserver,
+ public Observers<ConnectionObserver>
+{
public:
- void add(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.insert(observer);
- }
-
- void remove(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.erase(observer);
- }
-
void connection(Connection& c) {
each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
}
@@ -62,16 +52,6 @@ class ConnectionObservers : public Conne
void forced(Connection& c, const std::string& text) {
each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
}
-
- private:
- typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
- sys::Mutex lock;
- Observers observers;
-
- template <class F> void each(F f) {
- sys::Mutex::ScopedLock l(lock);
- std::for_each(observers.begin(), observers.end(), f);
- }
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h Fri Aug 3 12:13:32 2012
@@ -54,7 +54,9 @@ class Consumer
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
+ /**@return the position of the last message seen by this consumer */
virtual framing::SequenceNumber getPosition() const { return position; }
+
virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
virtual bool deliver(QueuedMessage& msg) = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Daemon.h Fri Aug 3 12:13:32 2012
@@ -74,7 +74,6 @@ class Daemon : private boost::noncopyabl
pid_t pid;
int pipeFds[2];
- int lockFileFd;
std::string lockFile;
std::string pidDir;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Fri Aug 3 12:13:32 2012
@@ -26,6 +26,9 @@
#include <iostream>
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::management::Manageable;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp Fri Aug 3 12:13:32 2012
@@ -35,6 +35,8 @@
namespace qpid {
namespace broker {
+using std::string;
+
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
@@ -167,7 +169,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
- name(_name), durable(false), persistenceId(0), sequence(false),
+ name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Fri Aug 3 12:13:32 2012
@@ -174,8 +174,9 @@ public:
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
- Exchange::shared_ptr getAlternate() { return alternate; }
- void setAlternate(Exchange::shared_ptr _alternate);
+ QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
+ QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
+
void incAlternateUsers() { alternateUsers++; }
void decAlternateUsers() { alternateUsers--; }
bool inUseAsAlternate() { return alternateUsers > 0; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Aug 3 12:13:32 2012
@@ -19,6 +19,7 @@
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
@@ -42,38 +43,42 @@ pair<Exchange::shared_ptr, bool> Exchang
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
bool durable, const FieldTable& args){
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- Exchange::shared_ptr exchange;
-
- if (type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
- }else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
- }else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
- }else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementDirectExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
- }else if (type == ManagementTopicExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
- }else if (type == Link::exchangeTypeName) {
- exchange = Link::linkExchangeFactory(name);
- }else{
- FunctionMap::iterator i = factory.find(type);
- if (i == factory.end()) {
- throw UnknownExchangeTypeException();
- } else {
- exchange = i->second(name, durable, args, parent, broker);
+ Exchange::shared_ptr exchange;
+ std::pair<Exchange::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ if (type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+ }else if (type == Link::exchangeTypeName) {
+ exchange = Link::linkExchangeFactory(name);
+ }else{
+ FunctionMap::iterator i = factory.find(type);
+ if (i == factory.end()) {
+ throw UnknownExchangeTypeException();
+ } else {
+ exchange = i->second(name, durable, args, parent, broker);
+ }
}
+ exchanges[name] = exchange;
+ result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
- exchanges[name] = exchange;
- return std::pair<Exchange::shared_ptr, bool>(exchange, true);
- } else {
- return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
+ return result;
}
void ExchangeRegistry::destroy(const string& name){
@@ -82,12 +87,17 @@ void ExchangeRegistry::destroy(const str
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i != exchanges.end()) {
- i->second->destroy();
- exchanges.erase(i);
+ Exchange::shared_ptr exchange;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i != exchanges.end()) {
+ exchange = i->second;
+ i->second->destroy();
+ exchanges.erase(i);
+ }
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Fri Aug 3 12:13:32 2012
@@ -24,6 +24,9 @@
#include <algorithm>
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp Fri Aug 3 12:13:32 2012
@@ -26,6 +26,9 @@
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp Fri Aug 3 12:13:32 2012
@@ -125,18 +125,20 @@ boost::shared_ptr<Exchange> Link::linkEx
return Exchange::shared_ptr(new LinkExchange(_name));
}
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
+Link::Link(const string& _name,
+ LinkRegistry* _links,
const string& _host,
uint16_t _port,
const string& _transport,
+ DestroyedListener l,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
- Manageable* parent)
- : links(_links), store(_store),
+ Manageable* parent,
+ bool failover_)
+ : name(_name), links(_links),
configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
host(_host), port(_port), transport(_transport),
durable(_durable),
@@ -149,7 +151,9 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
+ listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failover(failover_),
failoverChannel(0)
{
if (parent != 0 && broker != 0)
@@ -157,7 +161,10 @@ Link::Link(LinkRegistry* _links,
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
agent->addObject(mgmtObject, 0, durable);
}
}
@@ -169,13 +176,15 @@ Link::Link(LinkRegistry* _links,
}
broker->getTimer().add(timerTask);
- stringstream _name;
- _name << "qpid.link." << transport << ":" << host << ":" << port;
- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
- exchangeTypeName);
- failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
- assert(failoverExchange);
- failoverExchange->setLink(this);
+ if (failover) {
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc =
+ broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
+ }
}
Link::~Link ()
@@ -187,7 +196,8 @@ Link::~Link ()
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
- broker->getExchanges().destroy(failoverExchange->getName());
+ if (failover)
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -239,16 +249,19 @@ void Link::established(Connection* c)
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- connection = c;
- if (closing)
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ isClosing = closing;
+ }
+ if (isClosing)
destroy();
else // Process any IO tasks bridges added before established.
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -261,16 +274,26 @@ void Link::setUrl(const Url& u) {
namespace {
- /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
- void sessionDetached(Link *link) {
- QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
- }
+class DetachedCallback : public SessionHandler::ErrorListener {
+ public:
+ DetachedCallback(const Link& link) : name(link.getName()) {}
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+ void detach() {}
+ private:
+ const std::string name;
+};
}
-
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
+
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ }
+
// Get default URL from known-hosts if not already set
if (url.empty()) {
const std::vector<Url>& known = connection->getKnownHosts();
@@ -282,80 +305,82 @@ void Link::opened() {
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
- //
- // attempt to subscribe to failover exchange for updates from remote
- //
-
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
- failoverChannel = nextChannel();
-
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
- failoverSession = queueName;
- sessionHandler.attachAs(failoverSession);
-
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-
- remoteBroker.getQueue().declare(queueName,
- "", // alt-exchange
- false, // passive
- false, // durable
- true, // exclusive
- true, // auto-delete
- FieldTable());
- remoteBroker.getExchange().bind(queueName,
- FAILOVER_EXCHANGE,
- "", // no key
- FieldTable());
- remoteBroker.getMessage().subscribe(queueName,
- failoverExchange->getName(),
- 1, // implied-accept mode
- 0, // pre-acquire mode
- false, // exclusive
- "", // resume-id
- 0, // resume-ttl
+ if (failover) {
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
FieldTable());
- remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
- remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ }
}
void Link::closed(int, std::string text)
{
- bool isClosing = false;
- {
- Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
+ Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
- connection = 0;
- if (state == STATE_OPERATIONAL) {
+ connection = 0;
+
+ if (!hideManagement()) {
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
stringstream addr;
addr << host << ":" << port;
- if (!hideManagement() && agent)
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
- if (state != STATE_FAILED && state != STATE_PASSIVE)
- {
- setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
- }
+ if (state != STATE_FAILED && state != STATE_PASSIVE)
+ {
+ setStateLH(STATE_WAITING);
+ if (!hideManagement())
+ mgmtObject->set_lastError (text);
}
- // Call destroy outside of the lock, don't want to be deleted with lock held.
- if (isClosing)
- destroy();
}
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
void Link::destroy ()
{
Bridges toDelete;
+
+ timerTask->cancel(); // call prior to locking so maintenance visit can finish
{
Mutex::ScopedLock mutex(lock);
@@ -374,14 +399,13 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
-
- timerTask->cancel();
}
+
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
+ (*i)->close();
toDelete.clear();
- links->destroy (configuredHost, configuredPort);
+ listener(this); // notify LinkRegistry that this Link has been destroyed
}
void Link::add(Bridge::shared_ptr bridge)
@@ -423,7 +447,7 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL)
+ if (state != STATE_OPERATIONAL || closing)
return;
// check for bridge session errors and recover
@@ -460,7 +484,7 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
-
+ if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -476,21 +500,27 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
+ else if (state == STATE_OPERATIONAL &&
+ (!active.empty() || !created.empty() || !cancellations.empty()) &&
+ connection && connection->isOpen())
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
+}
void Link::reconnectLH(const Address& a)
{
host = a.host;
port = a.port;
transport = a.protocol;
- startConnectionLH();
+
if (!hideManagement()) {
stringstream errorString;
- errorString << "Failed over to " << a;
+ errorString << "Failing over to " << a;
mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
}
+ startConnectionLH();
}
bool Link::tryFailoverLH() {
@@ -499,15 +529,14 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
reconnectLH(next);
return true;
}
return false;
}
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
@@ -536,18 +565,34 @@ void Link::setPersistenceId(uint64_t id)
const string& Link::getName() const
{
- return configuredHost;
+ return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
+ string name;
+ if (kind == ENCODED_IDENTIFIER) {
+ // newer version provides a link name.
+ buffer.getShortString(name);
+ }
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -556,12 +601,21 @@ Link::shared_ptr Link::decode(LinkRegist
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a name for the new Link.
+ */
+ name = createName(transport, host, port);
+ }
+
+ return links.declare(name, host, port, transport, durable, authMechanism,
+ username, password).first;
}
void Link::encode(Buffer& buffer) const
{
- buffer.putShortString(string("link"));
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
buffer.putShortString(configuredHost);
buffer.putShort(configuredPort);
buffer.putShortString(configuredTransport);
@@ -573,8 +627,9 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return configuredHost.size() + 1 // short-string (host)
- + 5 // short-string ("link")
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + configuredHost.size() + 1 // short-string (host)
+ 2 // port
+ configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
@@ -589,6 +644,7 @@ ManagementObject* Link::GetManagementObj
}
void Link::close() {
+ QPID_LOG(debug, "Link::close(), link=" << name );
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
@@ -609,36 +665,31 @@ Manageable::status_t Link::ManagementMet
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
+ /* TBD: deprecate this interface in favor of the Broker::create() method. The
+ * Broker::create() method allows the user to assign a name to the bridge.
+ */
+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
+ // Does a bridge already exist that has the src/dest/key? If so, re-use the
+ // existing bridge - this behavior is backward compatible with previous releases.
+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+ if (!bridge) {
+ // need to create a new bridge on this link.
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+ *this, iargs.i_durable,
+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+ if (!rc.first) {
+ text = "invalid parameters";
+ return Manageable::STATUS_PARAMETER_INVALID;
}
}
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
return Manageable::STATUS_OK;
}
@@ -666,11 +717,13 @@ void Link::closeConnection( const std::s
{
if (connection != 0) {
// cancel our subscription to the failover exchange
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- if (sessionHandler.getSession()) {
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
- remoteBroker.getMessage().cancel(failoverExchange->getName());
- remoteBroker.getSession().detach(failoverSession);
+ if (failover) {
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
@@ -716,6 +769,23 @@ void Link::setState(const framing::Field
}
}
+std::string Link::createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port)
+{
+ stringstream linkName;
+ linkName << QPID_NAME_PREFIX << transport << std::string(":")
+ << host << std::string(":") << port;
+ return linkName.str();
+}
+
+
+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
+{
+ Mutex::ScopedLock mutex(lock);
+ return (isConnecting() && _port == port && _host == host);
+}
+
const std::string Link::exchangeTypeName("qpid.LinkExchange");
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.h Fri Aug 3 12:13:32 2012
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
@@ -52,8 +51,8 @@ class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
mutable sys::Mutex lock;
+ const std::string name;
LinkRegistry* links;
- MessageStore* store;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
@@ -64,7 +63,8 @@ class Link : public PersistableConfig, p
uint16_t port;
std::string transport;
- bool durable;
+ bool durable;
+
std::string authMechanism;
std::string username;
std::string password;
@@ -85,8 +85,10 @@ class Link : public PersistableConfig, p
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
+ boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
@@ -101,33 +103,39 @@ class Link : public PersistableConfig, p
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
+ void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void reconnectLH(const Address&); //called by LinkRegistry
- void established(Connection*); // Called when connection is create
+ // connection management (called by LinkRegistry)
+ void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
+ void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
+ bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
+ typedef boost::function<void(Link*)> DestroyedListener;
- Link(LinkRegistry* links,
- MessageStore* store,
+ Link(const std::string& name,
+ LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
+ DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password,
Broker* broker,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0,
+ bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
@@ -139,7 +147,7 @@ class Link : public PersistableConfig, p
/** returns the current address of the remote, which may be different from the
configured transport/host/port due to failover. Returns true if connection is
active */
- bool getRemoteAddress(qpid::Address& addr) const;
+ QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
@@ -148,15 +156,17 @@ class Link : public PersistableConfig, p
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
- QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+ // Close the link.
+ QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
- void notifyConnectionForced(const std::string text);
void setPassive(bool p);
+ bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
@@ -165,7 +175,10 @@ class Link : public PersistableConfig, p
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
@@ -178,6 +191,16 @@ class Link : public PersistableConfig, p
// replicate internal state of this Link for clustering
void getState(framing::FieldTable& state) const;
void setState(const framing::FieldTable& state);
+
+ /** create a name for a link (if none supplied by user config) */
+ static std::string createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port);
+
+ /** The current connction for this link. Note returns 0 if the link is not
+ * presently connected.
+ */
+ Connection* getConnection() { return connection; }
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org