You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [10/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Feb 28 16:14:30 2013
@@ -23,16 +23,19 @@
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -41,6 +44,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include <boost/bind.hpp>
#include <algorithm>
#include <sstream>
#include <iostream>
@@ -57,23 +61,25 @@ using qmf::org::apache::qpid::broker::Ev
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
using namespace framing;
-using std::string;
+using namespace std;
using std::ostream;
using types::Variant;
using namespace broker;
namespace {
-const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
const string QUERY_RESPONSE("_query_response");
-const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
+const string SCHEMA_ID("_schema_id");
+const string WHAT("_what");
const string ALTEX("altEx");
const string ALTEXCHANGE("altExchange");
@@ -81,24 +87,27 @@ const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
const string BIND("bind");
-const string UNBIND("unbind");
const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
const string CREATED("created");
const string DISP("disp");
+const string DEST("dest");
const string DURABLE("durable");
const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
const string EXNAME("exName");
const string EXTYPE("exType");
+const string HA_BROKER("habroker");
const string KEY("key");
const string NAME("name");
+const string PARTIAL("partial");
const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -107,10 +116,6 @@ const string QMF_CONTENT("qmf.content");
const string QMF_DEFAULT_TOPIC("qmf.default.topic");
const string QMF_OPCODE("qmf.opcode");
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
@@ -118,21 +123,18 @@ const string QMF_DEFAULT_DIRECT("qmf.def
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
- return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
- request[_WHAT] = OBJECT;
+ request[WHAT] = OBJECT;
Variant::Map schema;
- schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = packageName;
- request[_SCHEMA_ID] = schema;
+ schema[CLASS_NAME] = className;
+ schema[PACKAGE_NAME] = packageName;
+ request[SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
method.setBof(true);
@@ -170,19 +172,144 @@ Variant::Map asMapVoid(const Variant& va
}
} // namespace
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& lp, BrokerReplicator& br) :
+ logPrefix(lp), brokerReplicator(br) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ }
+ void channelException(framing::session::DetachCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ }
+ void executionException(framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+ BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+ virtual void connection(Connection&) {}
+ virtual void opened(Connection&) {}
+
+ virtual void closed(Connection& c) {
+ if (brokerReplicator.link && &c == brokerReplicator.connection)
+ brokerReplicator.disconnected();
+ }
+ virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
+ private:
+ BrokerReplicator& brokerReplicator;
+};
+
+/** Keep track of queues or exchanges during the update process to solve 2
+ * problems.
+ *
+ * 1. Once all responses are processed, remove any queues/exchanges
+ * that were not mentioned as they no longer exist on the primary.
+ *
+ * 2. During the update if we see an event for an object we should
+ * ignore any subsequent responses for that object as they are out
+ * of date.
+ */
+class BrokerReplicator::UpdateTracker {
+ public:
+ typedef std::set<std::string> Names;
+ typedef boost::function<void (const std::string&)> CleanFn;
+
+ UpdateTracker(const std::string& type_, // "queue" or "exchange"
+ CleanFn f, const ReplicationTest& rt)
+ : type(type_), cleanFn(f), repTest(rt) {}
+
+ /** Destructor cleans up remaining initial queues. */
+ ~UpdateTracker() {
+ // Don't throw in a destructor.
+ try { for_each(initial.begin(), initial.end(), cleanFn); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+ }
+ }
+
+ /** Add an exchange name */
+ void addExchange(Exchange::shared_ptr ex) {
+ if (repTest.getLevel(*ex))
+ initial.insert(ex->getName());
+ }
+
+ /** Add a queue name. */
+ void addQueue(Queue::shared_ptr q) {
+ if (repTest.getLevel(*q))
+ initial.insert(q->getName());
+ }
+
+ /** Received an event for name */
+ void event(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ events.insert(name); // we have seen an event for this name
+ }
+
+ /** Received a response for name.
+ *@return true if this response should be processed, false if we have
+ *already seen an event for this object.
+ */
+ bool response(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ return events.find(name) == events.end(); // true if no event seen yet.
+ }
+
+ private:
+ void clean(const std::string& name) {
+ QPID_LOG(info, "Backup updated, deleting " << type << " " << name);
+ cleanFn(name);
+ }
+
+ std::string type;
+ Names initial, events;
+ CleanFn cleanFn;
+ ReplicationTest repTest;
+};
+
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
- haBroker(hb), broker(hb.getBroker()), link(l),
+ logPrefix("Backup: "), replicationTest(NONE),
+ haBroker(hb), broker(hb.getBroker()),
+ exchanges(broker.getExchanges()), queues(broker.getQueues()),
+ link(l),
initialized(false),
- alternates(hb.getBroker().getExchanges())
-{}
+ alternates(hb.getBroker().getExchanges()),
+ connection(0)
+{
+ connectionObserver.reset(new ConnectionObserver(*this));
+ broker.getConnectionObservers().add(connectionObserver);
+ framing::FieldTable args = getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str());
+ setArgs(args);
+
+ dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
+ dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
+ dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
+ dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
+ dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
+ dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
+ dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
+ dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
+}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
- broker.getLinks().declare(
+ std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
name, // name for bridge
*link, // parent
false, // durable
@@ -195,21 +322,47 @@ void BrokerReplicator::initialize() {
"", // excludes
false, // dynamic
0, // sync?
- // shared_ptr keeps this in memory until outstanding initializeBridge
+ // shared_ptr keeps this in memory until outstanding connected
// calls are run.
- boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
+ boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
+ assert(result.second);
+ result.first->setErrorListener(
+ boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
-BrokerReplicator::~BrokerReplicator() { }
+BrokerReplicator::~BrokerReplicator() { shutdown(); }
+
+namespace {
+void collectQueueReplicators(
+ const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (qr) collect.insert(qr);
+}
+} // namespace
+
+void BrokerReplicator::shutdown() {
+ // NOTE: this is called in a QMF dispatch thread, not the Link's connection
+ // thread. It's OK to be unlocked because it doesn't use any mutable state,
+ // it only calls thread safe functions objects belonging to the Broker.
+
+ // Unregister with broker objects:
+ if (connectionObserver) {
+ broker.getConnectionObservers().remove(connectionObserver);
+ connectionObserver.reset();
+ }
+ broker.getExchanges().destroy(getName());
+}
// This is called in the connection IO thread when the bridge is started.
-void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
// Use the credentials of the outgoing Link connection for creating queues,
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
- assert(link->getConnection());
+ connection = link->getConnection();
+ assert(connection);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getUrl();
@@ -221,6 +374,19 @@ void BrokerReplicator::initializeBridge(
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
+ exchangeTracker.reset(
+ new UpdateTracker("exchange",
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ replicationTest));
+ exchanges.eachExchange(
+ boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+
+ queueTracker.reset(
+ new UpdateTracker("queue",
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ replicationTest));
+ queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -231,9 +397,14 @@ void BrokerReplicator::initializeBridge(
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ FieldTable arguments;
+ arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ peer.getMessage().subscribe(
+ queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+ peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages());
+ peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes());
// Issue a query request for queues, exchanges, bindings and the habroker
// using event queue as the reply-to address
@@ -247,12 +418,12 @@ void BrokerReplicator::route(Deliverable
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
if (haBroker.getStatus() == JOINING) {
- haBroker.setStatus(CATCHUP);
+ haBroker.getMembership().setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
Variant::List list;
try {
- if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+ if (!MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getContent();
@@ -264,13 +435,9 @@ void BrokerReplicator::route(Deliverable
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
- else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
- else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
- else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
- else if (match<EventBind>(schema)) doEventBind(values);
- else if (match<EventUnbind>(schema)) doEventUnbind(values);
- else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
+ EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+ EventDispatchMap::iterator j = dispatch.find(key);
+ if (j != dispatch.end()) (this->*(j->second))(values);
}
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -285,15 +452,21 @@ void BrokerReplicator::route(Deliverable
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
- // We have received all of the exchange response.
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
+ QPID_LOG(debug, logPrefix << "All exchange responses received.")
+ exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+ QPID_LOG(debug, logPrefix << "All queue responses received.");
+ queueTracker.reset(); // Clean up queues that no longer exist in the primary
+ }
}
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
- << ": while handling: " << list);
- haBroker.shutdown();
+;
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Configuration replication failed: "
+ << e.what() << ": while handling: " << list));
throw;
}
}
@@ -301,31 +474,22 @@ void BrokerReplicator::route(Deliverable
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
- bool autoDel = values[AUTODEL].asBool();
- bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ if (queueTracker.get()) queueTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
- if (broker.getQueues().find(name)) {
- QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
- broker.getQueues().destroy(name);
- stopQueueReplicator(name);
- }
- settings.populate(args, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- settings,
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values[ALTEX].asString(),
- userId,
- remoteHost);
- assert(result.second); // Should be true since we destroyed existing queue above
- startQueueReplicator(result.first);
+ if (queues.find(name)) {
+ QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
+ << name);
+ deleteQueue(name);
+ }
+ replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
+ values[ALTEX].asString());
}
}
@@ -333,7 +497,7 @@ boost::shared_ptr<QueueReplicator> Broke
const std::string& qname)
{
string rname = QueueReplicator::replicatorName(qname);
- boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
return boost::dynamic_pointer_cast<QueueReplicator>(ex);
}
@@ -341,79 +505,85 @@ void BrokerReplicator::doEventQueueDelet
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+ boost::shared_ptr<Queue> queue = queues.find(name);
+ if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- stopQueueReplicator(name);
- broker.deleteQueue(name, userId, remoteHost);
+ if (queueTracker.get()) queueTracker->event(name);
+ deleteQueue(name);
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
- if (broker.getExchanges().find(name)) {
- broker.getExchanges().destroy(name);
- QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
- }
- boost::shared_ptr<Exchange> exchange =
- createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
- assert(exchange);
+ if (exchanges.find(name)) {
+ deleteExchange(name);
+ QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
+ << name);
+ }
+ CreateExchangeResult result = createExchange(
+ name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+ values[ALTEX].asString());
+ replicatedExchanges.insert(name);
+ assert(result.second);
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
- } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+ QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
+ } else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(name, userId, remoteHost);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
+ deleteExchange(name);
+ replicatedExchanges.erase(name);
}
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
- // We only replicate binds for a replicated queue to replicated
- // exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ // We only replicate binds for a replicated queue to replicated exchange
+ // that both exist locally. Respect the replication level set in the
+ // bind arguments, but replicate by default.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
exchange->bind(queue, key, &args, 0);
}
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
+ queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -424,7 +594,17 @@ void BrokerReplicator::doEventUnbind(Var
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
- haBroker.setMembership(members);
+ setMembership(members);
+}
+
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+ // Ignore queue replicator subscriptions.
+ if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+ if (qr) {
+ qr->setSubscribed();
+ QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+ }
}
namespace {
@@ -441,40 +621,68 @@ string getAltExchange(const types::Varia
}
else return string();
}
+
+Variant getHaUuid(const Variant::Map& map) {
+ Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
+ return i == map.end() ? Variant() : i->second;
}
+} // namespace
+
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
- return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
+ if (!queueTracker.get())
+ throw Exception(QPID_MSG("Unexpected queue response: " << values));
+ if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
+ // If we see a queue with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Queue> queue = queues.find(name);
+ if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
+ << name);
+ deleteQueue(name);
+ }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Queue> queue =
- createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
- getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the queue to already exist if we are failing over.
- if (queue) startQueueReplicator(queue);
- else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+ name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ if (qr) {
+ Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+ if (i != values.end() && isIntegerType(i->second.getType())) {
+ if (i->second.asInt64()) qr->setSubscribed();
+ }
+ }
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.replicateLevel(argsMap)) return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
+ if (!exchangeTracker.get())
+ throw Exception(QPID_MSG("Unexpected exchange response: " << values));
+ if (!exchangeTracker->response(name)) return; // Response is out of date.
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Exchange> exchange = createExchange(
+ // If we see an exchange with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
+ if (exchange &&
+ exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
+ {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
+ << name);
+ deleteExchange(name);
+ }
+ CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the exchange to already exist if we are failing over.
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
+ replicatedExchanges.insert(name);
}
namespace {
@@ -501,19 +709,25 @@ const std::string QUEUE_REF("queueRef");
void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
+ boost::shared_ptr<Queue> queue = queues.find(qName);
+
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- // Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ // Automatically replicate binding if queue and exchange exist and are replicated.
+ // Respect replicate setting in binding args but default to replicated.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
- string key = values[KEY].asString();
+ string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
+// framing::FieldTable args;
+// qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
exchange->bind(queue, key, &args, 0);
}
}
@@ -527,42 +741,65 @@ void BrokerReplicator::doResponseHaBroke
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = replicationTest.replicateLevel(
- values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
- haBroker.setMembership(values[MEMBERS].asList());
+ setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
- << ": " << values);
- haBroker.shutdown();
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
+ << ": " << values));
+
throw;
}
}
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+ const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+ if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
- if (!broker.getExchanges().registerExchange(qr))
+ if (!exchanges.registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
+ return qr;
}
+ return boost::shared_ptr<QueueReplicator>();
}
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
- if (qr) {
- qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed.
- broker.getExchanges().destroy(qr->getName());
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+ Queue::shared_ptr queue = queues.find(name);
+ if (queue) {
+ // Purge before deleting to ensure that we don't reroute any
+ // messages. Any reroutes will be done at the primary and
+ // replicated as normal.
+ if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
+ broker.deleteQueue(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
-boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+void BrokerReplicator::deleteExchange(const std::string& name) {
+ try {
+ boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+ if (!exchange) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+ return;
+ }
+ if (exchange->inUseAsAlternate()) {
+ QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
+ return;
+ }
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+ }
+}
+
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -571,7 +808,7 @@ boost::shared_ptr<Queue> BrokerReplicato
{
QueueSettings settings(durable, autodelete);
settings.populate(arguments, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+ CreateQueueResult result =
broker.createQueue(
name,
settings,
@@ -579,24 +816,23 @@ boost::shared_ptr<Queue> BrokerReplicato
string(), // Set alternate exchange below
userId,
remoteHost);
- if (result.second) {
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
- }
- return result.first;
+ boost::shared_ptr<QueueReplicator> qr;
+ if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+ if (result.second && !alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
- else return boost::shared_ptr<Queue>();
+ return qr;
}
-boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange)
{
- std::pair<boost::shared_ptr<Exchange>, bool> result =
+ CreateExchangeResult result =
broker.createExchange(
name,
type,
@@ -605,15 +841,12 @@ boost::shared_ptr<Exchange> BrokerReplic
args,
userId,
remoteHost);
- if (result.second) {
- alternates.addExchange(result.first);
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
- }
- return result.first;
+ alternates.addExchange(result.first);
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
}
- else return boost::shared_ptr<Exchange>();
+ return result;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
@@ -626,4 +859,61 @@ void BrokerReplicator::write(char* /*tar
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (!qr) return;
+ assert(qr);
+ if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+ }
+ else {
+ // Delete immediately. Don't purge, the primary is gone so we need
+ // to reroute the deleted messages.
+ deleteQueue(qr->getQueue()->getName(), false);
+ }
+ }
+}
+
+// Callback function for accumulating exchange candidates
+namespace {
+ void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
+ c.push_back(i);
+ }
+}
+
+void BrokerReplicator::disconnected() {
+ QPID_LOG(info, logPrefix << "Disconnected from " << primary);
+ connection = 0;
+ // Clean up auto-delete queues
+ vector<boost::shared_ptr<Exchange> > collect;
+ // Make a copy so we can work outside the ExchangeRegistry lock
+ exchanges.eachExchange(
+ boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
+ for_each(collect.begin(), collect.end(),
+ boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+}
+
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+ Membership& membership(haBroker.getMembership());
+ membership.assign(brokers);
+ // Check if the primary has signalled a change in my status:
+ // from CATCHUP to READY when we are caught up.
+ // from READY TO CATCHUP if we are timed out during fail-over.
+ BrokerInfo info;
+ if (membership.get(membership.getSelf(), info)) {
+ BrokerStatus oldStatus = haBroker.getStatus();
+ BrokerStatus newStatus = info.getStatus();
+ if (oldStatus == CATCHUP && newStatus == READY) {
+ QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+ haBroker.getMembership().setStatus(READY);
+ }
+ else if (oldStatus == READY && newStatus == CATCHUP) {
+ QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+ haBroker.getMembership().setStatus(CATCHUP);
+ }
+ }
+}
+
}} // namespace broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h Thu Feb 28 16:14:30 2013
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <set>
namespace qpid {
@@ -40,6 +41,9 @@ class Broker;
class Link;
class Bridge;
class SessionHandler;
+class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
}
namespace framing {
@@ -58,7 +62,9 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
+ * THREAD UNSAFE:
+ * All members except shutdown are only called in the Link's connection thread context.
+ * shutdown() does not use any mutable state.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -76,6 +82,7 @@ class BrokerReplicator : public broker::
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ void shutdown();
// DataSource interface - used to write persistence data to async store
uint64_t getSize();
@@ -83,8 +90,20 @@ class BrokerReplicator : public broker::
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+ typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+ typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
- void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ typedef std::pair<std::string,std::string> EventKey;
+ typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+ typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+ typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+ class UpdateTracker;
+ class ErrorListener;
+ class ConnectionObserver;
+
+ void connected(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -93,6 +112,7 @@ class BrokerReplicator : public broker::
void doEventBind(types::Variant::Map&);
void doEventUnbind(types::Variant::Map&);
void doEventMembersUpdate(types::Variant::Map&);
+ void doEventSubscribe(types::Variant::Map&);
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
@@ -100,32 +120,50 @@ class BrokerReplicator : public broker::
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void stopQueueReplicator(const std::string& name);
+ QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- boost::shared_ptr<broker::Queue> createQueue(
+ QueueReplicatorPtr replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange);
- boost::shared_ptr<broker::Exchange> createExchange(
+ CreateExchangeResult createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+ void deleteQueue(const std::string& name, bool purge=true);
+ void deleteExchange(const std::string& name);
+
+ void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
+
+ void disconnected();
+
+ void setMembership(const types::Variant::List&); // Set membership from list.
+
std::string logPrefix;
- std::string userId, remoteHost;
ReplicationTest replicationTest;
+ std::string userId, remoteHost;
HaBroker& haBroker;
broker::Broker& broker;
+ broker::ExchangeRegistry& exchanges;
+ broker::QueueRegistry& queues;
boost::shared_ptr<broker::Link> link;
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
+ typedef std::set<std::string> StringSet;
+ StringSet replicatedExchanges; // exchanges that have been replicated.
+ broker::Connection* connection;
+ EventDispatchMap dispatch;
+ std::auto_ptr<UpdateTracker> queueTracker;
+ std::auto_ptr<UpdateTracker> exchangeTracker;
+ boost::shared_ptr<ConnectionObserver> connectionObserver;
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp Thu Feb 28 16:14:30 2013
@@ -30,7 +30,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
+ : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
@@ -41,9 +41,11 @@ bool ConnectionObserver::getBrokerInfo(c
return false;
}
-void ConnectionObserver::setObserver(const ObserverPtr& o){
+void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+{
sys::Mutex::ScopedLock l(lock);
observer = o;
+ logPrefix = newlogPrefix;
}
ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h Thu Feb 28 16:14:30 2013
@@ -55,7 +55,7 @@ class ConnectionObserver : public broker
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
- void setObserver(const ObserverPtr&);
+ void setObserver(const ObserverPtr&, const std::string& logPrefix);
ObserverPtr getObserver();
void opened(broker::Connection& connection);
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp Thu Feb 28 16:14:30 2013
@@ -26,6 +26,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StandAlone.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -41,7 +42,6 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -54,134 +54,100 @@ using namespace std;
using types::Variant;
using types::Uuid;
using sys::Mutex;
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix("Broker: "),
- broker(b),
- systemId(broker.getSystem()->getSystemId().data()),
+ : systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ broker(b),
observer(new ConnectionObserver(*this, systemId)),
- mgmtObject(0),
- status(STANDALONE),
- membership(systemId),
- replicationTest(s.replicateDefault.get())
+ role(new StandAlone),
+ membership(BrokerInfo(systemId, STANDALONE), *this)
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, logPrefix << "Rejecting client connections.");
- observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
- new BackupConnectionExcluder));
+ QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
+ observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
}
}
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
// Called in Plugin::initialize
void HaBroker::initialize() {
-
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT),
- systemId);
-
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+ membership.add(
+ BrokerInfo(
+ membership.getSelf(),
+ settings.cluster ? JOINING : membership.getStatus(),
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT)
+ )
+ );
+ QPID_LOG(notice, "Initializing: " << membership.getInfo());
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
+ membership.setMgmtObject(mgmtObject);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
+ shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
- status = JOINING;
- backup.reset(new Backup(*this, settings));
+ assert(membership.getStatus() == JOINING);
+ role.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
-
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
-
- // NOTE: lock is not needed in a constructor, but create one
- // to pass to functions that have a ScopedLock parameter.
- Mutex::ScopedLock l(lock);
- statusChanged(l);
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+ QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover() {
- auto_ptr<Backup> b;
- {
- Mutex::ScopedLock l(lock);
- // No longer replicating, close link. Note: link must be closed before we
- // setStatus(RECOVERING) as that will remove our broker info from the
- // outgoing link properties so we won't recognize self-connects.
- b = backup;
- }
- b.reset(); // Call destructor outside of lock.
- BrokerInfo::Set backups;
- {
- Mutex::ScopedLock l(lock);
- setStatus(RECOVERING, l);
- backups = membership.otherBackups();
- membership.reset(brokerInfo);
- // Drop the lock, new Primary may call back on activate.
- }
- // Outside of lock, may call back on activate()
- primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- switch (getStatus()) {
- case JOINING: recover(); break;
- case CATCHUP:
- QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
- throw Exception("Still catching up, cannot be promoted.");
- break;
- case READY: recover(); break;
- case RECOVERING: break;
- case ACTIVE: break;
- case STANDALONE: break;
- }
- break;
+ Role* r = role->promote();
+ if (r) role.reset(r);
+ break;
}
case _qmf::HaBroker::METHOD_SETBROKERSURL: {
setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_SETPUBLICURL: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
- boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
Uuid uuid(true);
@@ -191,10 +157,10 @@ Manageable::status_t HaBroker::Managemen
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
- boost::shared_ptr<broker::Link> link = result.first;
+ shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(
+ shared_ptr<QueueReplicator> qr(
new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
@@ -207,31 +173,23 @@ Manageable::status_t HaBroker::Managemen
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
- clientUrl = url;
- updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
- Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- if (url.empty()) throw Url::Invalid("HA client URL is empty");
+ publicUrl = url;
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+ QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
- Mutex::ScopedLock l(lock);
- if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- brokerUrl = url;
- mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- if (backup.get()) backup->setBrokerUrl(brokerUrl);
- // Updating broker URL also updates defaulted client URL:
- if (clientUrl.empty()) updateClientUrl(l);
+ {
+ Mutex::ScopedLock l(lock);
+ brokerUrl = url;
+ mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ }
+ role->setBrokerUrl(url); // Oustside lock
}
std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -239,116 +197,14 @@ std::vector<Url> HaBroker::getKnownBroke
return knownBrokers;
}
-void HaBroker::shutdown() {
- QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, message);
broker.shutdown();
+ throw Exception(message);
}
BrokerStatus HaBroker::getStatus() const {
- Mutex::ScopedLock l(lock);
- return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
- Mutex::ScopedLock l(lock);
- setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
- // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
- static const BrokerStatus TRANSITIONS[][2] = {
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
- { READY, RECOVERING }, // Chosen as new primary
- { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
- { RECOVERING, ACTIVE } // All expected backups are ready
- };
- static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
- for (size_t i = 0; i < N; ++i) {
- if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
- return true;
- }
- return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix << "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
- assert(legal);
- if (!legal) {
- QPID_LOG(critical, logPrefix << "Illegal state transition: "
- << printable(status) << " -> " << printable(newStatus));
- shutdown();
- }
- status = newStatus;
- statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status).str());
- brokerInfo.setStatus(status);
- setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix << "Membership changed: " << membership);
- Variant::List brokers = membership.asList();
- mgmtObject->set_members(brokers);
- broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
- Mutex::ScopedLock l(lock);
- membership.assign(brokers);
- QPID_LOG(info, logPrefix << "Membership update: " << membership);
- BrokerInfo info;
- // Update my status to what the primary says it is. The primary can toggle
- // status between READY and CATCHUP based on the state of our subscriptions.
- if (membership.get(systemId, info) && status != info.getStatus()) {
- setStatus(info.getStatus(), l);
- if (backup.get()) backup->setStatus(status);
- }
- membershipUpdated(l);
-}
-
-void HaBroker::resetMembership(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
- membershipUpdated(l);
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b);
- membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
- Mutex::ScopedLock l(lock);
- membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id);
- membershipUpdated(l);
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
- framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status)) {
- // If this is a backup then any outgoing links are backup
- // links and need to be tagged.
- linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
- }
- else {
- // If this is a primary then any outgoing links are federation links
- // and should not be tagged.
- linkProperties.erase(ConnectionObserver::BACKUP_TAG);
- }
- broker.setLinkClientProperties(linkProperties);
+ return membership.getStatus();
}
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h Thu Feb 28 16:14:30 2013
@@ -25,14 +25,12 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
-#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
#include "qpid/types/Variant.h"
-#include <memory>
#include <set>
#include <boost/shared_ptr.hpp>
@@ -54,11 +52,15 @@ namespace ha {
class Backup;
class ConnectionObserver;
class Primary;
-
+class Role;
/**
* HA state and actions associated with a HA broker. Holds all the management info.
*
* THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
*/
class HaBroker : public management::Manageable
{
@@ -71,66 +73,46 @@ class HaBroker : public management::Mana
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
- /** Shut down the broker. Caller should log a critical error message. */
- void shutdown();
+ /** Shut down the broker because of a critical error. */
+ void shutdown(const std::string& message);
BrokerStatus getStatus() const;
- void setStatus(BrokerStatus);
- void activate();
-
- Backup* getBackup() { return backup.get(); }
- ReplicationTest getReplicationTest() const { return replicationTest; }
-
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
- const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
- void setMembership(const types::Variant::List&); // Set membership from list.
- void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
- void addBroker(const BrokerInfo& b); // Add a broker to the membership.
- void removeBroker(const types::Uuid& id); // Remove a broker from membership.
-
+ BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+ Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
private:
- void setClientUrl(const Url&);
+
+ void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
- bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
- void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
- void recover();
- void statusChanged(sys::Mutex::ScopedLock&);
- void setLinkProperties(sys::Mutex::ScopedLock&);
-
std::vector<Url> getKnownBrokers() const;
- void membershipUpdated(sys::Mutex::ScopedLock&);
-
- std::string logPrefix;
- broker::Broker& broker;
- types::Uuid systemId;
+ // Immutable members
+ const types::Uuid systemId;
const Settings settings;
+ // Member variables protected by lock
mutable sys::Mutex lock;
- boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
- std::auto_ptr<Backup> backup;
- std::auto_ptr<Primary> primary;
- qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
- Url clientUrl, brokerUrl;
+ Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- BrokerStatus status;
- BrokerInfo brokerInfo;
+
+ // Independently thread-safe member variables
+ broker::Broker& broker;
+ qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+ boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+ boost::shared_ptr<Role> role;
Membership membership;
- ReplicationTest replicationTest;
};
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -33,9 +33,11 @@ struct Options : public qpid::Options {
addOptions()
("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
+ ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"),
+ "Enable replication of specific queues without joining a cluster")
("ha-brokers-url", optValue(settings.brokerUrl,"URL"),
"URL with address of each broker in the cluster.")
- ("ha-public-url", optValue(settings.clientUrl,"URL"),
+ ("ha-public-url", optValue(settings.publicUrl,"URL"),
"URL advertized to clients to connect to the cluster.")
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
@@ -48,6 +50,10 @@ struct Options : public qpid::Options {
"Authentication mechanism for connections between HA brokers")
("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
"Maximum time to wait for an expected backup to connect and become ready.")
+ ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+ "Flow control message count limit for replication, 0 means no limit")
+ ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+ "Flow control byte limit for replication, 0 means no limit")
;
}
};
@@ -64,17 +70,23 @@ struct HaPlugin : public Plugin {
void earlyInitialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
- // Must create the HaBroker in earlyInitialize so it can set up its
- // connection observer before clients start connecting.
- haBroker.reset(new ha::HaBroker(*broker, settings));
- broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+ if (broker && (settings.cluster || settings.queueReplication)) {
+ if (!broker->getManagementAgent()) {
+ QPID_LOG(info, "HA plugin disabled because management is disabled");
+ if (settings.cluster)
+ throw Exception("Cannot start HA: management is disabled");
+ } else {
+ // Must create the HaBroker in earlyInitialize so it can set up its
+ // connection observer before clients start connecting.
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+ }
}
}
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) haBroker->initialize();
+ if (broker && haBroker.get()) haBroker->initialize();
}
void finalize() {
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp Thu Feb 28 16:14:30 2013
@@ -19,6 +19,12 @@
*
*/
#include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iterator>
@@ -26,37 +32,58 @@
namespace qpid {
namespace ha {
+namespace _qmf = ::qmf::org::apache::qpid::ha;
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+}
+
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
brokers.clear();
- brokers[b.getSystemId()] = b;
+ brokers[self] = me;
}
void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ if (id == self) return; // Never remove myself
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- }
+ update(l);
+ }
}
bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
- brokers.clear();
+ Mutex::ScopedLock l(lock);
+ clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
+ update(l);
}
types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -64,6 +91,7 @@ types::Variant::List Membership::asList(
}
BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +99,84 @@ BrokerInfo::Set Membership::otherBackups
return result;
}
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
return true;
}
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
- return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+ QPID_LOG(info, "Membership: " << brokers);
+ Variant::List brokers = asList();
+ if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+ if (mgmtObject) mgmtObject->set_members(brokers);
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ BrokerStatus status = getStatus();
+ QPID_LOG(info, "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ if (!legal) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+ << " -> " << printable(newStatus)));
+ }
+
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+ update(l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
}
+// FIXME aconway 2013-01-23: move to .h?
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h Thu Feb 28 16:14:30 2013
@@ -24,45 +24,72 @@
#include "BrokerInfo.h"
#include "types.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/types/Variant.h"
#include <boost/function.hpp>
#include <set>
#include <vector>
#include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
namespace ha {
+class HaBroker;
/**
* Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
*/
class Membership
{
public:
- Membership(const types::Uuid& self_) : self(self_) {}
+ Membership(const BrokerInfo& info, HaBroker&);
+
+ void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
- void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+ void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
- bool get(const types::Uuid& id, BrokerInfo& result);
+ bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+ types::Uuid getSelf() const { return self; }
+ BrokerInfo getInfo() const;
+ BrokerStatus getStatus() const;
+ void setStatus(BrokerStatus s);
private:
- types::Uuid self;
+ void update(sys::Mutex::ScopedLock&);
+ BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+ mutable sys::Mutex lock;
+ HaBroker& haBroker;
+ boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+ const types::Uuid self;
BrokerInfo::Map brokers;
- friend std::ostream& operator<<(std::ostream&, const Membership&);
};
-std::ostream& operator<<(std::ostream&, const Membership&);
-
}} // namespace qpid::ha
#endif /*!QPID_HA_MEMBERSHIP_H*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org