You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [10/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Feb 28 16:14:30 2013
@@ -23,16 +23,19 @@
 #include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventUnbind.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -41,6 +44,7 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include <boost/bind.hpp>
 #include <algorithm>
 #include <sstream>
 #include <iostream>
@@ -57,23 +61,25 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::broker::EventQueueDelete;
 using qmf::org::apache::qpid::broker::EventSubscribe;
 using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
 using namespace framing;
-using std::string;
+using namespace std;
 using std::ostream;
 using types::Variant;
 using namespace broker;
 
 namespace {
 
-const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
 
 const string CLASS_NAME("_class_name");
 const string EVENT("_event");
 const string OBJECT_NAME("_object_name");
 const string PACKAGE_NAME("_package_name");
 const string QUERY_RESPONSE("_query_response");
-const string SCHEMA_ID("_schema_id");
 const string VALUES("_values");
+const string SCHEMA_ID("_schema_id");
+const string WHAT("_what");
 
 const string ALTEX("altEx");
 const string ALTEXCHANGE("altExchange");
@@ -81,24 +87,27 @@ const string ARGS("args");
 const string ARGUMENTS("arguments");
 const string AUTODEL("autoDel");
 const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
 const string BIND("bind");
-const string UNBIND("unbind");
 const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
 const string CREATED("created");
 const string DISP("disp");
+const string DEST("dest");
 const string DURABLE("durable");
 const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
 const string EXNAME("exName");
 const string EXTYPE("exType");
+const string HA_BROKER("habroker");
 const string KEY("key");
 const string NAME("name");
+const string PARTIAL("partial");
 const string QNAME("qName");
 const string QUEUE("queue");
 const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
 
 const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -107,10 +116,6 @@ const string QMF_CONTENT("qmf.content");
 const string QMF_DEFAULT_TOPIC("qmf.default.topic");
 const string QMF_OPCODE("qmf.opcode");
 
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
 const string OBJECT("OBJECT");
 const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
 const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
@@ -118,21 +123,18 @@ const string QMF_DEFAULT_DIRECT("qmf.def
 const string _QUERY_REQUEST("_query_request");
 const string BROKER("broker");
 const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
-    return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
 
 void sendQuery(const string& packageName, const string& className, const string& queueName,
                SessionHandler& sessionHandler)
 {
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     Variant::Map request;
-    request[_WHAT] = OBJECT;
+    request[WHAT] = OBJECT;
     Variant::Map schema;
-    schema[_CLASS_NAME] = className;
-    schema[_PACKAGE_NAME] = packageName;
-    request[_SCHEMA_ID] = schema;
+    schema[CLASS_NAME] = className;
+    schema[PACKAGE_NAME] = packageName;
+    request[SCHEMA_ID] = schema;
 
     AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
     method.setBof(true);
@@ -170,19 +172,144 @@ Variant::Map asMapVoid(const Variant& va
 }
 } // namespace
 
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+  public:
+    ErrorListener(const std::string& lp, BrokerReplicator& br) :
+        logPrefix(lp), brokerReplicator(br) {}
+
+    void connectionException(framing::connection::CloseCode, const std::string& msg) {
+        QPID_LOG(error, logPrefix << "Connection error: " << msg);
+    }
+    void channelException(framing::session::DetachCode, const std::string& msg) {
+        QPID_LOG(error, logPrefix << "Channel error: " << msg);
+    }
+    void executionException(framing::execution::ErrorCode, const std::string& msg) {
+        QPID_LOG(error, logPrefix << "Execution error: " << msg);
+    }
+    void detach() {
+        QPID_LOG(debug, logPrefix << "Session detached.");
+    }
+
+  private:
+    std::string logPrefix;
+    BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+  public:
+    ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+    virtual void connection(Connection&) {}
+    virtual void opened(Connection&) {}
+
+    virtual void closed(Connection& c) {
+        if (brokerReplicator.link && &c == brokerReplicator.connection)
+            brokerReplicator.disconnected();
+    }
+    virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
+  private:
+    BrokerReplicator& brokerReplicator;
+};
+
+/** Keep track of queues or exchanges during the update process to solve 2
+ * problems.
+ *
+ * 1. Once all responses are processed, remove any queues/exchanges
+ * that were not mentioned as they no longer exist on the primary.
+ *
+ * 2. During the update if we see an event for an object we should
+ * ignore any subsequent responses for that object as they are out
+ * of date.
+ */
+class BrokerReplicator::UpdateTracker {
+  public:
+    typedef std::set<std::string> Names;
+    typedef boost::function<void (const std::string&)> CleanFn;
+
+    UpdateTracker(const std::string& type_, // "queue" or "exchange"
+                  CleanFn f, const ReplicationTest& rt)
+        : type(type_), cleanFn(f), repTest(rt) {}
+
+    /** Destructor cleans up remaining initial queues. */
+    ~UpdateTracker() {
+        // Don't throw in a destructor.
+        try { for_each(initial.begin(), initial.end(), cleanFn); }
+        catch (const std::exception& e) {
+            QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+        }
+    }
+
+    /** Add an exchange name */
+    void addExchange(Exchange::shared_ptr ex)  {
+        if (repTest.getLevel(*ex))
+            initial.insert(ex->getName());
+    }
+
+    /** Add a queue name. */
+    void addQueue(Queue::shared_ptr q) {
+        if (repTest.getLevel(*q))
+            initial.insert(q->getName());
+    }
+
+    /** Received an event for name */
+    void event(const std::string& name) {
+        initial.erase(name); // no longer a candidate for deleting
+        events.insert(name); // we have seen an event for this name
+    }
+
+    /** Received a response for name.
+     *@return true if this response should be processed, false if we have
+     *already seen an event for this object.
+     */
+    bool response(const std::string& name) {
+        initial.erase(name); // no longer a candidate for deleting
+        return events.find(name) == events.end(); // true if no event seen yet.
+    }
+
+  private:
+    void clean(const std::string& name) {
+        QPID_LOG(info, "Backup updated, deleting  " << type << " " << name);
+        cleanFn(name);
+    }
+
+    std::string type;
+    Names initial, events;
+    CleanFn cleanFn;
+    ReplicationTest repTest;
+};
+
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
-      logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
-      haBroker(hb), broker(hb.getBroker()), link(l),
+      logPrefix("Backup: "), replicationTest(NONE),
+      haBroker(hb), broker(hb.getBroker()),
+      exchanges(broker.getExchanges()), queues(broker.getQueues()),
+      link(l),
       initialized(false),
-      alternates(hb.getBroker().getExchanges())
-{}
+      alternates(hb.getBroker().getExchanges()),
+      connection(0)
+{
+    connectionObserver.reset(new ConnectionObserver(*this));
+    broker.getConnectionObservers().add(connectionObserver);
+    framing::FieldTable args = getArgs();
+    args.setString(QPID_REPLICATE, printable(NONE).str());
+    setArgs(args);
+
+    dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
+    dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
+    dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
+    dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
+    dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
+    dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
+    dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
+    dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
+}
 
 void BrokerReplicator::initialize() {
     // Can't do this in the constructor because we need a shared_ptr to this.
     types::Uuid uuid(true);
     const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
-    broker.getLinks().declare(
+    std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
         name,               // name for bridge
         *link,              // parent
         false,              // durable
@@ -195,21 +322,47 @@ void BrokerReplicator::initialize() {
         "",                 // excludes
         false,              // dynamic
         0,                  // sync?
-        // shared_ptr keeps this in memory until outstanding initializeBridge
+        // shared_ptr keeps this in memory until outstanding connected
         // calls are run.
-        boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
+        boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
     );
+    assert(result.second);
+    result.first->setErrorListener(
+        boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
 }
 
-BrokerReplicator::~BrokerReplicator() { }
+BrokerReplicator::~BrokerReplicator() { shutdown(); }
+
+namespace {
+void collectQueueReplicators(
+    const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+    boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+    if (qr) collect.insert(qr);
+}
+} // namespace
+
+void BrokerReplicator::shutdown() {
+    // NOTE: this is called in a QMF dispatch thread, not the Link's connection
+    // thread.  It's OK to be unlocked because it doesn't use any mutable state,
+    // it only calls thread safe functions objects belonging to the Broker.
+
+    // Unregister with broker objects:
+    if (connectionObserver) {
+        broker.getConnectionObservers().remove(connectionObserver);
+        connectionObserver.reset();
+    }
+    broker.getExchanges().destroy(getName());
+}
 
 // This is called in the connection IO thread when the bridge is started.
-void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
     // Use the credentials of the outgoing Link connection for creating queues,
     // exchanges etc. We know link->getConnection() is non-zero because we are
     // being called in the connections thread context.
     //
-    assert(link->getConnection());
+    connection = link->getConnection();
+    assert(connection);
     userId = link->getConnection()->getUserId();
     remoteHost = link->getConnection()->getUrl();
 
@@ -221,6 +374,19 @@ void BrokerReplicator::initializeBridge(
              << " status:" << printable(haBroker.getStatus()));
     initialized = true;
 
+    exchangeTracker.reset(
+        new UpdateTracker("exchange",
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+                          replicationTest));
+    exchanges.eachExchange(
+        boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+
+    queueTracker.reset(
+        new UpdateTracker("queue",
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+                          replicationTest));
+    queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
 
@@ -231,9 +397,14 @@ void BrokerReplicator::initializeBridge(
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
     //subscribe to the queue
-    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
-    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+    FieldTable arguments;
+    arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+    peer.getMessage().subscribe(
+        queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+        false/*exclusive*/, "", 0, arguments);
+    peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+    peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages());
+    peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes());
 
     // Issue a query request for queues, exchanges, bindings and the habroker
     // using event queue as the reply-to address
@@ -247,12 +418,12 @@ void BrokerReplicator::route(Deliverable
     // We transition from JOINING->CATCHUP on the first message received from the primary.
     // Until now we couldn't be sure if we had a good connection to the primary.
     if (haBroker.getStatus() == JOINING) {
-        haBroker.setStatus(CATCHUP);
+        haBroker.getMembership().setStatus(CATCHUP);
         QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
     }
     Variant::List list;
     try {
-        if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+        if (!MessageTransfer::isQMFv2(msg.getMessage()))
             throw Exception("Unexpected message, not QMF2 event or query response.");
         // decode as list
         string content = msg.getMessage().getContent();
@@ -264,13 +435,9 @@ void BrokerReplicator::route(Deliverable
                 QPID_LOG(trace, "Broker replicator event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
-                else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
-                else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
-                else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
-                else if (match<EventBind>(schema)) doEventBind(values);
-                else if (match<EventUnbind>(schema)) doEventUnbind(values);
-                else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
+                EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+                EventDispatchMap::iterator j = dispatch.find(key);
+                if (j != dispatch.end()) (this->*(j->second))(values);
             }
         } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -285,15 +452,21 @@ void BrokerReplicator::route(Deliverable
                 else if (type == BINDING) doResponseBind(values);
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
-            if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
-                // We have received all of the exchange response.
+            if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
+                QPID_LOG(debug, logPrefix << "All exchange responses received.")
+                exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary
                 alternates.clear();
             }
+            if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+                QPID_LOG(debug, logPrefix << "All queue responses received.");
+                queueTracker.reset(); // Clean up queues that no longer exist in the primary
+            }
         }
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
-                 << ": while handling: " << list);
-        haBroker.shutdown();
+;
+        haBroker.shutdown(
+            QPID_MSG(logPrefix << "Configuration replication failed: "
+                     << e.what() << ": while handling: " << list));
         throw;
     }
 }
@@ -301,31 +474,22 @@ void BrokerReplicator::route(Deliverable
 
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
     Variant::Map argsMap = asMapVoid(values[ARGS]);
-    bool autoDel = values[AUTODEL].asBool();
-    bool excl = values[EXCL].asBool();
-    if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+    if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
         string name = values[QNAME].asString();
         QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
+        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+        if (queueTracker.get()) queueTracker->event(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
-        if (broker.getQueues().find(name)) {
-            QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
-            broker.getQueues().destroy(name);
-            stopQueueReplicator(name);
-        }
-        settings.populate(args, settings.storeSettings);
-        std::pair<boost::shared_ptr<Queue>, bool> result =
-            broker.createQueue(
-                name,
-                settings,
-                0 /*i.e. no owner regardless of exclusivity on master*/,
-                values[ALTEX].asString(),
-                userId,
-                remoteHost);
-        assert(result.second);  // Should be true since we destroyed existing queue above
-        startQueueReplicator(result.first);
+        if (queues.find(name)) {
+            QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
+                     << name);
+            deleteQueue(name);
+        }
+        replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
+                       values[ALTEX].asString());
     }
 }
 
@@ -333,7 +497,7 @@ boost::shared_ptr<QueueReplicator> Broke
     const std::string& qname)
 {
     string rname = QueueReplicator::replicatorName(qname);
-    boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+    boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
     return boost::dynamic_pointer_cast<QueueReplicator>(ex);
 }
 
@@ -341,79 +505,85 @@ void BrokerReplicator::doEventQueueDelet
     // The remote queue has already been deleted so replicator
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
-    boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+    boost::shared_ptr<Queue> queue = queues.find(name);
+    if (queue && replicationTest.getLevel(*queue)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
-        stopQueueReplicator(name);
-        broker.deleteQueue(name, userId, remoteHost);
+        if (queueTracker.get()) queueTracker->event(name);
+        deleteQueue(name);
     }
 }
 
 void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGS]));
-    if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
-    if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+    if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+        if (exchangeTracker.get()) exchangeTracker->event(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
-        if (broker.getExchanges().find(name)) {
-            broker.getExchanges().destroy(name);
-            QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
-        }
-        boost::shared_ptr<Exchange> exchange =
-            createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
-        assert(exchange);
+        if (exchanges.find(name)) {
+            deleteExchange(name);
+            QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
+                     << name);
+        }
+        CreateExchangeResult result = createExchange(
+            name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+            values[ALTEX].asString());
+        replicatedExchanges.insert(name);
+        assert(result.second);
     }
 }
 
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
     string name = values[EXNAME].asString();
-    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+    boost::shared_ptr<Exchange> exchange = exchanges.find(name);
     if (!exchange) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
-    } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+        QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
+    } else if (!replicationTest.getLevel(*exchange)) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
     } else {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
-        broker.deleteExchange(name, userId, remoteHost);
+        if (exchangeTracker.get()) exchangeTracker->event(name);
+        deleteExchange(name);
+        replicatedExchanges.erase(name);
     }
 }
 
 void BrokerReplicator::doEventBind(Variant::Map& values) {
     boost::shared_ptr<Exchange> exchange =
-        broker.getExchanges().find(values[EXNAME].asString());
+        exchanges.find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
-        broker.getQueues().find(values[QNAME].asString());
-    // We only replicate binds for a replicated queue to replicated
-    // exchange that both exist locally.
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+        queues.find(values[QNAME].asString());
+    framing::FieldTable args;
+    qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+    // We only replicate binds for a replicated queue to replicated exchange
+    // that both exist locally. Respect the replication level set in the
+    // bind arguments, but replicate by default.
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue) &&
+        ReplicationTest(ALL).getLevel(args))
     {
-        framing::FieldTable args;
-        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
-                 << " key=" << key);
+                 << " key=" << key
+                 << " args=" << args);
         exchange->bind(queue, key, &args, 0);
     }
 }
 
 void BrokerReplicator::doEventUnbind(Variant::Map& values) {
     boost::shared_ptr<Exchange> exchange =
-        broker.getExchanges().find(values[EXNAME].asString());
+        exchanges.find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
-        broker.getQueues().find(values[QNAME].asString());
+        queues.find(values[QNAME].asString());
     // We only replicate unbinds for a replicated queue to replicated
     // exchange that both exist locally.
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue))
     {
-        framing::FieldTable args;
-        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
@@ -424,7 +594,17 @@ void BrokerReplicator::doEventUnbind(Var
 
 void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
     Variant::List members = values[MEMBERS].asList();
-    haBroker.setMembership(members);
+    setMembership(members);
+}
+
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+    // Ignore queue replicator subscriptions.
+    if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+    if (qr) {
+        qr->setSubscribed();
+        QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+    }
 }
 
 namespace {
@@ -441,40 +621,68 @@ string getAltExchange(const types::Varia
     }
     else return string();
 }
+
+Variant getHaUuid(const Variant::Map& map) {
+    Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
+    return i == map.end() ? Variant() : i->second;
 }
 
+} // namespace
+
+
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
-    if (!replicationTest.isReplicated(
-            CONFIGURATION,
-            values[ARGUMENTS].asMap(),
-            values[AUTODELETE].asBool(),
-            values[EXCLUSIVE].asBool()))
-        return;
+    if (!replicationTest.getLevel(argsMap)) return;
     string name(values[NAME].asString());
+    if (!queueTracker.get())
+        throw Exception(QPID_MSG("Unexpected queue response: " << values));
+    if (!queueTracker->response(name)) return; // Response is out-of-date
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
+    // If we see a queue with the same name as one we have, but not the same UUID,
+    // then replace the one we have.
+    boost::shared_ptr<Queue> queue = queues.find(name);
+    if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
+        QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
+                 << name);
+        deleteQueue(name);
+    }
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
-    boost::shared_ptr<Queue> queue =
-        createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
-                    getAltExchange(values[ALTEXCHANGE]));
-    // It is normal for the queue to already exist if we are failing over.
-    if (queue) startQueueReplicator(queue);
-    else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+    boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+        name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+        getAltExchange(values[ALTEXCHANGE]));
+    if (qr) {
+        Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+        if (i != values.end() && isIntegerType(i->second.getType())) {
+            if (i->second.asInt64()) qr->setSubscribed();
+        }
+    }
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
-    if (!replicationTest.replicateLevel(argsMap)) return;
+    if (!replicationTest.getLevel(argsMap)) return;
     string name = values[NAME].asString();
+    if (!exchangeTracker.get())
+        throw Exception(QPID_MSG("Unexpected exchange response: " << values));
+    if (!exchangeTracker->response(name)) return; // Response is out of date.
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
-    boost::shared_ptr<Exchange> exchange = createExchange(
+    // If we see an exchange with the same name as one we have, but not the same UUID,
+    // then replace the one we have.
+    boost::shared_ptr<Exchange> exchange = exchanges.find(name);
+    if (exchange &&
+        exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
+    {
+        QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
+                 << name);
+        deleteExchange(name);
+    }
+    CreateExchangeResult result = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));
-    // It is normal for the exchange to already exist if we are failing over.
-    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
+    replicatedExchanges.insert(name);
 }
 
 namespace {
@@ -501,19 +709,25 @@ const std::string QUEUE_REF("queueRef");
 void BrokerReplicator::doResponseBind(Variant::Map& values) {
     std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
     std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
-    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
-    boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+    boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
+    boost::shared_ptr<Queue> queue = queues.find(qName);
+
+    framing::FieldTable args;
+    qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
 
-    // Automatically replicate binding if queue and exchange exist and are replicated
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+    // Automatically replicate binding if queue and exchange exist and are replicated.
+    // Respect replicate setting in binding args but default to replicated.
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue) &&
+        ReplicationTest(ALL).getLevel(args))
     {
-        string key = values[KEY].asString();
+        string key = values[BINDING_KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
                  << " queue:" << qName
-                 << " key:" << key);
-        framing::FieldTable args;
-        qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+                 << " key:" << key
+                 << " args:" << args);
+//        framing::FieldTable args;
+//        qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         exchange->bind(queue, key, &args, 0);
     }
 }
@@ -527,42 +741,65 @@ void BrokerReplicator::doResponseHaBroke
     try {
         QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
-        ReplicateLevel primary = replicationTest.replicateLevel(
-            values[REPLICATE_DEFAULT].asString());
+        ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
         if (mine != primary)
             throw Exception(QPID_MSG("Replicate default on backup (" << mine
                                      << ") does not match primary (" <<  primary << ")"));
-        haBroker.setMembership(values[MEMBERS].asList());
+        setMembership(values[MEMBERS].asList());
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
-                 << ": " << values);
-        haBroker.shutdown();
+        haBroker.shutdown(
+            QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
+                     << ": " << values));
+
         throw;
     }
 }
 
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+    const boost::shared_ptr<Queue>& queue)
 {
-    if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+    if (replicationTest.getLevel(*queue) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
             new QueueReplicator(haBroker, queue, link));
-        if (!broker.getExchanges().registerExchange(qr))
+        if (!exchanges.registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
         qr->activate();
+        return qr;
     }
+    return boost::shared_ptr<QueueReplicator>();
 }
 
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
-    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
-    if (qr) {
-        qr->deactivate();
-        // QueueReplicator's bridge is now queued for destruction but may not
-        // actually be destroyed.
-        broker.getExchanges().destroy(qr->getName());
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+    Queue::shared_ptr queue = queues.find(name);
+    if (queue) {
+        // Purge before deleting to ensure that we don't reroute any
+        // messages. Any reroutes will be done at the primary and
+        // replicated as normal.
+        if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
+        broker.deleteQueue(name, userId, remoteHost);
+        QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
     }
 }
 
-boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+void BrokerReplicator::deleteExchange(const std::string& name) {
+    try {
+        boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+        if (!exchange) {
+            QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+            return;
+        }
+        if (exchange->inUseAsAlternate()) {
+            QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
+            return;
+        }
+        broker.deleteExchange(name, userId, remoteHost);
+        QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+    } catch (const framing::NotFoundException&) {
+        QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+    }
+}
+
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
     const std::string& name,
     bool durable,
     bool autodelete,
@@ -571,7 +808,7 @@ boost::shared_ptr<Queue> BrokerReplicato
 {
     QueueSettings settings(durable, autodelete);
     settings.populate(arguments, settings.storeSettings);
-    std::pair<boost::shared_ptr<Queue>, bool> result =
+    CreateQueueResult result =
         broker.createQueue(
             name,
             settings,
@@ -579,24 +816,23 @@ boost::shared_ptr<Queue> BrokerReplicato
             string(), // Set alternate exchange below
             userId,
             remoteHost);
-    if (result.second) {
-        if (!alternateExchange.empty()) {
-            alternates.setAlternate(
-                alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
-        }
-        return result.first;
+    boost::shared_ptr<QueueReplicator> qr;
+    if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+    if (result.second && !alternateExchange.empty()) {
+        alternates.setAlternate(
+            alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
     }
-    else return  boost::shared_ptr<Queue>();
+    return qr;
 }
 
-boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
     const std::string& name,
     const std::string& type,
     bool durable,
     const qpid::framing::FieldTable& args,
     const std::string& alternateExchange)
 {
-    std::pair<boost::shared_ptr<Exchange>, bool> result =
+    CreateExchangeResult result =
         broker.createExchange(
             name,
             type,
@@ -605,15 +841,12 @@ boost::shared_ptr<Exchange> BrokerReplic
             args,
             userId,
             remoteHost);
-    if (result.second) {
-        alternates.addExchange(result.first);
-        if (!alternateExchange.empty()) {
-            alternates.setAlternate(
-                alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
-        }
-        return result.first;
+    alternates.addExchange(result.first);
+    if (!alternateExchange.empty()) {
+        alternates.setAlternate(
+            alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
     }
-    else return  boost::shared_ptr<Exchange>();
+    return result;
 }
 
 bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
@@ -626,4 +859,61 @@ void BrokerReplicator::write(char* /*tar
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
+void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+    boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+    if (!qr) return;
+    assert(qr);
+    if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+        if (qr->getQueue()->getSettings().autoDeleteDelay) {
+            // Start the auto-delete timer
+            Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+        }
+        else {
+            // Delete immediately. Don't purge, the primary is gone so we need
+            // to reroute the deleted messages.
+            deleteQueue(qr->getQueue()->getName(), false);
+        }
+    }
+}
+
+// Callback function for accumulating exchange candidates
+namespace {
+	void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
+		c.push_back(i);
+	}
+}
+
+void BrokerReplicator::disconnected() {
+    QPID_LOG(info, logPrefix << "Disconnected from " << primary);
+    connection = 0;
+    // Clean up auto-delete queues
+    vector<boost::shared_ptr<Exchange> > collect;
+    // Make a copy so we can work outside the ExchangeRegistry lock
+    exchanges.eachExchange(
+        boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
+    for_each(collect.begin(), collect.end(),
+             boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+}
+
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+    Membership& membership(haBroker.getMembership());
+    membership.assign(brokers);
+    // Check if the primary has signalled a change in my status:
+    // from CATCHUP to READY when we are caught up.
+    // from READY TO CATCHUP if we are timed out during fail-over.
+    BrokerInfo info;
+    if (membership.get(membership.getSelf(), info)) {
+        BrokerStatus oldStatus = haBroker.getStatus();
+        BrokerStatus newStatus = info.getStatus();
+        if (oldStatus == CATCHUP && newStatus == READY) {
+            QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+            haBroker.getMembership().setStatus(READY);
+        }
+        else if (oldStatus == READY && newStatus == CATCHUP) {
+            QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+            haBroker.getMembership().setStatus(CATCHUP);
+        }
+    }
+}
+
 }} // namespace broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h Thu Feb 28 16:14:30 2013
@@ -31,6 +31,7 @@
 #include "qpid/management/ManagementObject.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
+#include <set>
 
 namespace qpid {
 
@@ -40,6 +41,9 @@ class Broker;
 class Link;
 class Bridge;
 class SessionHandler;
+class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
 }
 
 namespace framing {
@@ -58,7 +62,9 @@ class QueueReplicator;
  * exchanges and bindings to replicate the primary.
  * It also creates QueueReplicators for newly replicated queues.
  *
- * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
+ * THREAD UNSAFE:
+ * All members except shutdown are only called in the Link's connection thread context.
+ * shutdown() does not use any mutable state.
  *
  */
 class BrokerReplicator : public broker::Exchange,
@@ -76,6 +82,7 @@ class BrokerReplicator : public broker::
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
     void route(broker::Deliverable&);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+    void shutdown();
 
     // DataSource interface - used to write persistence data to async store
     uint64_t getSize();
@@ -83,8 +90,20 @@ class BrokerReplicator : public broker::
 
   private:
     typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+    typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+    typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
 
-    void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+    typedef std::pair<std::string,std::string> EventKey;
+    typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+    typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+    typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+    class UpdateTracker;
+    class ErrorListener;
+    class ConnectionObserver;
+
+    void connected(broker::Bridge&, broker::SessionHandler&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
@@ -93,6 +112,7 @@ class BrokerReplicator : public broker::
     void doEventBind(types::Variant::Map&);
     void doEventUnbind(types::Variant::Map&);
     void doEventMembersUpdate(types::Variant::Map&);
+    void doEventSubscribe(types::Variant::Map&);
 
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
@@ -100,32 +120,50 @@ class BrokerReplicator : public broker::
     void doResponseHaBroker(types::Variant::Map& values);
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
-    void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
-    void stopQueueReplicator(const std::string& name);
+    QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
-    boost::shared_ptr<broker::Queue> createQueue(
+    QueueReplicatorPtr replicateQueue(
         const std::string& name,
         bool durable,
         bool autodelete,
         const qpid::framing::FieldTable& arguments,
         const std::string& alternateExchange);
 
-    boost::shared_ptr<broker::Exchange> createExchange(
+    CreateExchangeResult createExchange(
         const std::string& name,
         const std::string& type,
         bool durable,
         const qpid::framing::FieldTable& args,
         const std::string& alternateExchange);
 
+    bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+    void deleteQueue(const std::string& name, bool purge=true);
+    void deleteExchange(const std::string& name);
+
+    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
+
+    void disconnected();
+
+    void setMembership(const types::Variant::List&); // Set membership from list.
+
     std::string logPrefix;
-    std::string userId, remoteHost;
     ReplicationTest replicationTest;
+    std::string userId, remoteHost;
     HaBroker& haBroker;
     broker::Broker& broker;
+    broker::ExchangeRegistry& exchanges;
+    broker::QueueRegistry& queues;
     boost::shared_ptr<broker::Link> link;
     bool initialized;
     AlternateExchangeSetter alternates;
     qpid::Address primary;
+    typedef std::set<std::string> StringSet;
+    StringSet replicatedExchanges; // exchanges that have been replicated.
+    broker::Connection* connection;
+    EventDispatchMap dispatch;
+    std::auto_ptr<UpdateTracker> queueTracker;
+    std::auto_ptr<UpdateTracker> exchangeTracker;
+    boost::shared_ptr<ConnectionObserver> connectionObserver;
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp Thu Feb 28 16:14:30 2013
@@ -30,7 +30,7 @@ namespace qpid {
 namespace ha {
 
 ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
-    : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
+    : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
 
 bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
     framing::FieldTable ft;
@@ -41,9 +41,11 @@ bool ConnectionObserver::getBrokerInfo(c
     return false;
 }
 
-void ConnectionObserver::setObserver(const ObserverPtr& o){
+void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+{
     sys::Mutex::ScopedLock l(lock);
     observer = o;
+    logPrefix = newlogPrefix;
 }
 
 ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h Thu Feb 28 16:14:30 2013
@@ -55,7 +55,7 @@ class ConnectionObserver : public broker
 
     ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
 
-    void setObserver(const ObserverPtr&);
+    void setObserver(const ObserverPtr&, const std::string& logPrefix);
     ObserverPtr getObserver();
 
     void opened(broker::Connection& connection);

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp Thu Feb 28 16:14:30 2013
@@ -26,6 +26,7 @@
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
+#include "StandAlone.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
@@ -41,7 +42,6 @@
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
 
@@ -54,134 +54,100 @@ using namespace std;
 using types::Variant;
 using types::Uuid;
 using sys::Mutex;
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
 
 // Called in Plugin::earlyInitialize
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : logPrefix("Broker: "),
-      broker(b),
-      systemId(broker.getSystem()->getSystemId().data()),
+    : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
+      broker(b),
       observer(new ConnectionObserver(*this, systemId)),
-      mgmtObject(0),
-      status(STANDALONE),
-      membership(systemId),
-      replicationTest(s.replicateDefault.get())
+      role(new StandAlone),
+      membership(BrokerInfo(systemId, STANDALONE), *this)
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        QPID_LOG(debug, logPrefix << "Rejecting client connections.");
-        observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
-                              new BackupConnectionExcluder));
+        QPID_LOG(debug, "Broker startup, rejecting client connections.");
+        shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
+        observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
     }
 }
 
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
 // Called in Plugin::initialize
 void HaBroker::initialize() {
-
     // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
-    brokerInfo = BrokerInfo(
-        broker.getSystem()->getNodeName(),
-        broker.getPort(broker::Broker::TCP_TRANSPORT),
-        systemId);
-
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+    membership.add(
+        BrokerInfo(
+            membership.getSelf(),
+            settings.cluster ? JOINING : membership.getStatus(),
+            broker.getSystem()->getNodeName(),
+            broker.getPort(broker::Broker::TCP_TRANSPORT)
+        )
+    );
+    QPID_LOG(notice, "Initializing: " << membership.getInfo());
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
     if (settings.cluster && !ma)
         throw Exception("Cannot start HA: management is disabled");
     _qmf::Package  packageInit(ma);
-    mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+    mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
     mgmtObject->set_replicateDefault(settings.replicateDefault.str());
     mgmtObject->set_systemId(systemId);
     ma->addObject(mgmtObject);
+    membership.setMgmtObject(mgmtObject);
 
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
-        boost::shared_ptr<ReplicatingSubscription::Factory>(
+        shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
 
     // If we are in a cluster, start as backup in joining state.
     if (settings.cluster) {
-        status = JOINING;
-        backup.reset(new Backup(*this, settings));
+        assert(membership.getStatus() == JOINING);
+        role.reset(new Backup(*this, settings));
         broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+        if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+        if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
     }
-
-    if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
-    if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
-
-    // NOTE: lock is not needed in a constructor, but create one
-    // to pass to functions that have a ScopedLock parameter.
-    Mutex::ScopedLock l(lock);
-    statusChanged(l);
 }
 
 HaBroker::~HaBroker() {
-    QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+    QPID_LOG(notice, role->getLogPrefix() << "Shut down");
     broker.getConnectionObservers().remove(observer);
 }
 
-void HaBroker::recover() {
-    auto_ptr<Backup> b;
-    {
-        Mutex::ScopedLock l(lock);
-        // No longer replicating, close link. Note: link must be closed before we
-        // setStatus(RECOVERING) as that will remove our broker info from the
-        // outgoing link properties so we won't recognize self-connects.
-        b = backup;
-    }
-    b.reset();                  // Call destructor outside of lock.
-    BrokerInfo::Set backups;
-    {
-        Mutex::ScopedLock l(lock);
-        setStatus(RECOVERING, l);
-        backups = membership.otherBackups();
-        membership.reset(brokerInfo);
-        // Drop the lock, new Primary may call back on activate.
-    }
-    // Outside of lock, may call back on activate()
-    primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
     switch (methodId) {
       case _qmf::HaBroker::METHOD_PROMOTE: {
-          switch (getStatus()) {
-            case JOINING: recover(); break;
-            case CATCHUP:
-              QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
-              throw Exception("Still catching up, cannot be promoted.");
-              break;
-            case READY: recover(); break;
-            case RECOVERING: break;
-            case ACTIVE: break;
-            case STANDALONE: break;
-          }
-          break;
+        Role* r = role->promote();
+        if (r) role.reset(r);
+        break;
       }
       case _qmf::HaBroker::METHOD_SETBROKERSURL: {
           setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
           break;
       }
       case _qmf::HaBroker::METHOD_SETPUBLICURL: {
-          setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+          setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
           break;
       }
       case _qmf::HaBroker::METHOD_REPLICATE: {
           _qmf::ArgsHaBrokerReplicate& bq_args =
               dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
-          QPID_LOG(debug, logPrefix << "Replicate individual queue "
+          QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
                    << bq_args.i_queue << " from " << bq_args.i_broker);
 
-          boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+          shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
           Url url(bq_args.i_broker);
           string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
           Uuid uuid(true);
@@ -191,10 +157,10 @@ Manageable::status_t HaBroker::Managemen
               false,              // durable
               settings.mechanism, settings.username, settings.password,
               false);           // no amq.failover - don't want to use client URL.
-          boost::shared_ptr<broker::Link> link = result.first;
+          shared_ptr<broker::Link> link = result.first;
           link->setUrl(url);
           // Create a queue replicator
-          boost::shared_ptr<QueueReplicator> qr(
+          shared_ptr<QueueReplicator> qr(
               new QueueReplicator(*this, queue, link));
           qr->activate();
           broker.getExchanges().registerExchange(qr);
@@ -207,31 +173,23 @@ Manageable::status_t HaBroker::Managemen
     return Manageable::STATUS_OK;
 }
 
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
     Mutex::ScopedLock l(lock);
-    if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
-    clientUrl = url;
-    updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
-    Url url = clientUrl.empty() ? brokerUrl : clientUrl;
-    if (url.empty()) throw Url::Invalid("HA client URL is empty");
+    publicUrl = url;
     mgmtObject->set_publicUrl(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
-    QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+    QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {
-    Mutex::ScopedLock l(lock);
-    if (url.empty()) throw Url::Invalid("HA broker URL is empty");
-    brokerUrl = url;
-    mgmtObject->set_brokersUrl(brokerUrl.str());
-    QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
-    if (backup.get()) backup->setBrokerUrl(brokerUrl);
-    // Updating broker URL also updates defaulted client URL:
-    if (clientUrl.empty()) updateClientUrl(l);
+    {
+        Mutex::ScopedLock l(lock);
+        brokerUrl = url;
+        mgmtObject->set_brokersUrl(brokerUrl.str());
+        QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+    }
+    role->setBrokerUrl(url); // Oustside lock
 }
 
 std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -239,116 +197,14 @@ std::vector<Url> HaBroker::getKnownBroke
     return knownBrokers;
 }
 
-void HaBroker::shutdown() {
-    QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+void HaBroker::shutdown(const std::string& message) {
+    QPID_LOG(critical, message);
     broker.shutdown();
+    throw Exception(message);
 }
 
 BrokerStatus HaBroker::getStatus() const {
-    Mutex::ScopedLock l(lock);
-    return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
-    Mutex::ScopedLock l(lock);
-    setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
-    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
-    static const BrokerStatus TRANSITIONS[][2] = {
-        { JOINING, CATCHUP },    // Connected to primary
-        { JOINING, RECOVERING }, // Chosen as initial primary.
-        { CATCHUP, READY },      // Caught up all queues, ready to take over.
-        { READY, RECOVERING },   // Chosen as new primary
-        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
-        { RECOVERING, ACTIVE }   // All expected backups are ready
-    };
-    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
-    for (size_t i = 0; i < N; ++i) {
-        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
-            return true;
-    }
-    return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
-    QPID_LOG(info, logPrefix << "Status change: "
-             << printable(status) << " -> " << printable(newStatus));
-    bool legal = checkTransition(status, newStatus);
-    assert(legal);
-    if (!legal) {
-        QPID_LOG(critical, logPrefix << "Illegal state transition: "
-                 << printable(status) << " -> " << printable(newStatus));
-        shutdown();
-    }
-    status = newStatus;
-    statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
-    mgmtObject->set_status(printable(status).str());
-    brokerInfo.setStatus(status);
-    setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
-    QPID_LOG(info, logPrefix << "Membership changed: " <<  membership);
-    Variant::List brokers = membership.asList();
-    mgmtObject->set_members(brokers);
-    broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
-    Mutex::ScopedLock l(lock);
-    membership.assign(brokers);
-    QPID_LOG(info, logPrefix << "Membership update: " <<  membership);
-    BrokerInfo info;
-    // Update my status to what the primary says it is.  The primary can toggle
-    // status between READY and CATCHUP based on the state of our subscriptions.
-    if (membership.get(systemId, info) && status != info.getStatus()) {
-        setStatus(info.getStatus(), l);
-        if (backup.get()) backup->setStatus(status);
-    }
-    membershipUpdated(l);
-}
-
-void HaBroker::resetMembership(const BrokerInfo& b) {
-    Mutex::ScopedLock l(lock);
-    membership.reset(b);
-    QPID_LOG(debug, logPrefix << "Membership reset to: " <<  membership);
-    membershipUpdated(l);
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
-    Mutex::ScopedLock l(lock);
-    membership.add(b);
-    QPID_LOG(debug, logPrefix << "Membership add: " <<  b);
-    membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
-    Mutex::ScopedLock l(lock);
-    membership.remove(id);
-    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id);
-    membershipUpdated(l);
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
-    framing::FieldTable linkProperties = broker.getLinkClientProperties();
-    if (isBackup(status)) {
-        // If this is a backup then any outgoing links are backup
-        // links and need to be tagged.
-        linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
-    }
-    else {
-        // If this is a primary then any outgoing links are federation links
-        // and should not be tagged.
-        linkProperties.erase(ConnectionObserver::BACKUP_TAG);
-    }
-    broker.setLinkClientProperties(linkProperties);
+    return membership.getStatus();
 }
 
 }} // namespace qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h Thu Feb 28 16:14:30 2013
@@ -25,14 +25,12 @@
 #include "BrokerInfo.h"
 #include "Membership.h"
 #include "types.h"
-#include "ReplicationTest.h"
 #include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
 #include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/types/Variant.h"
-#include <memory>
 #include <set>
 #include <boost/shared_ptr.hpp>
 
@@ -54,11 +52,15 @@ namespace ha {
 class Backup;
 class ConnectionObserver;
 class Primary;
-
+class Role;
 /**
  * HA state and actions associated with a HA broker. Holds all the management info.
  *
  * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
  */
 class HaBroker : public management::Manageable
 {
@@ -71,66 +73,46 @@ class HaBroker : public management::Mana
     void initialize();
 
     // Implement Manageable.
-    qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+    qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 
     broker::Broker& getBroker() { return broker; }
     const Settings& getSettings() const { return settings; }
 
-    /** Shut down the broker. Caller should log a critical error message. */
-    void shutdown();
+    /** Shut down the broker because of a critical error. */
+    void shutdown(const std::string& message);
 
     BrokerStatus getStatus() const;
-    void setStatus(BrokerStatus);
-    void activate();
-
-    Backup* getBackup() { return backup.get(); }
-    ReplicationTest getReplicationTest() const { return replicationTest; }
-
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
-    const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
-    void setMembership(const types::Variant::List&); // Set membership from list.
-    void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
-    void addBroker(const BrokerInfo& b);       // Add a broker to the membership.
-    void removeBroker(const types::Uuid& id);  // Remove a broker from membership.
-
+    BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+    Membership& getMembership() { return membership; }
     types::Uuid getSystemId() const { return systemId; }
 
   private:
-    void setClientUrl(const Url&);
+
+    void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
 
-    bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
-    void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
-    void recover();
-    void statusChanged(sys::Mutex::ScopedLock&);
-    void setLinkProperties(sys::Mutex::ScopedLock&);
-
     std::vector<Url> getKnownBrokers() const;
 
-    void membershipUpdated(sys::Mutex::ScopedLock&);
-
-    std::string logPrefix;
-    broker::Broker& broker;
-    types::Uuid systemId;
+    // Immutable members
+    const types::Uuid systemId;
     const Settings settings;
 
+    // Member variables protected by lock
     mutable sys::Mutex lock;
-    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
-    std::auto_ptr<Backup> backup;
-    std::auto_ptr<Primary> primary;
-    qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
-    Url clientUrl, brokerUrl;
+    Url publicUrl, brokerUrl;
     std::vector<Url> knownBrokers;
-    BrokerStatus status;
-    BrokerInfo brokerInfo;
+
+    // Independently thread-safe member variables
+    broker::Broker& broker;
+    qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+    boost::shared_ptr<Role> role;
     Membership membership;
-    ReplicationTest replicationTest;
 };
 }} // namespace qpid::ha
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -33,9 +33,11 @@ struct Options : public qpid::Options {
         addOptions()
             ("ha-cluster", optValue(settings.cluster, "yes|no"),
              "Join a HA active/passive cluster.")
+            ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"),
+             "Enable replication of specific queues without joining a cluster")
             ("ha-brokers-url", optValue(settings.brokerUrl,"URL"),
              "URL with address of each broker in the cluster.")
-            ("ha-public-url", optValue(settings.clientUrl,"URL"),
+            ("ha-public-url", optValue(settings.publicUrl,"URL"),
              "URL advertized to clients to connect to the cluster.")
             ("ha-replicate",
              optValue(settings.replicateDefault, "LEVEL"),
@@ -48,6 +50,10 @@ struct Options : public qpid::Options {
              "Authentication mechanism for connections between HA brokers")
             ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
              "Maximum time to wait for an expected backup to connect and become ready.")
+            ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+             "Flow control message count limit for replication, 0 means no limit")
+            ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+             "Flow control byte limit for replication, 0 means no limit")
             ;
     }
 };
@@ -64,17 +70,23 @@ struct HaPlugin : public Plugin {
 
     void earlyInitialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (broker) {
-            // Must create the HaBroker in earlyInitialize so it can set up its
-            // connection observer before clients start connecting.
-            haBroker.reset(new ha::HaBroker(*broker, settings));
-            broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+        if (broker && (settings.cluster || settings.queueReplication)) {
+            if (!broker->getManagementAgent()) {
+                QPID_LOG(info, "HA plugin disabled because management is disabled");
+                if (settings.cluster)
+                    throw Exception("Cannot start HA: management is disabled");
+            } else {
+                // Must create the HaBroker in earlyInitialize so it can set up its
+                // connection observer before clients start connecting.
+                haBroker.reset(new ha::HaBroker(*broker, settings));
+                broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+            }
         }
     }
 
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (broker) haBroker->initialize();
+        if (broker && haBroker.get()) haBroker->initialize();
     }
 
     void finalize() {

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp Thu Feb 28 16:14:30 2013
@@ -19,6 +19,12 @@
  *
  */
 #include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include <boost/bind.hpp>
 #include <iostream>
 #include <iterator>
@@ -26,37 +32,58 @@
 namespace qpid {
 namespace ha {
 
+namespace _qmf = ::qmf::org::apache::qpid::ha;
 
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+    : haBroker(b), self(info.getSystemId())
+{
+    brokers[self] = info;
+}
+
+void Membership::clear() {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo me = brokers[self];
     brokers.clear();
-    brokers[b.getSystemId()] = b;
+    brokers[self] = me;
 }
 
 void Membership::add(const BrokerInfo& b) {
+    Mutex::ScopedLock l(lock);
     brokers[b.getSystemId()] = b;
+    update(l);
 }
 
 
 void Membership::remove(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
+    if (id == self) return;     // Never remove myself
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i != brokers.end()) {
         brokers.erase(i);
-        }
+        update(l);
+    }
 }
 
 bool Membership::contains(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
     return brokers.find(id) != brokers.end();
 }
 
 void Membership::assign(const types::Variant::List& list) {
-    brokers.clear();
+    Mutex::ScopedLock l(lock);
+    clear();
     for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
         BrokerInfo b(i->asMap());
         brokers[b.getSystemId()] = b;
     }
+    update(l);
 }
 
 types::Variant::List Membership::asList() const {
+    Mutex::ScopedLock l(lock);
     types::Variant::List list;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         list.push_back(i->second.asMap());
@@ -64,6 +91,7 @@ types::Variant::List Membership::asList(
 }
 
 BrokerInfo::Set Membership::otherBackups() const {
+    Mutex::ScopedLock l(lock);
     BrokerInfo::Set result;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +99,84 @@ BrokerInfo::Set Membership::otherBackups
     return result;
 }
 
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
-    BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(id);
     if (i == brokers.end()) return false;
     result = i->second;
     return true;
 }
 
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
-    return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+    QPID_LOG(info, "Membership: " <<  brokers);
+    Variant::List brokers = asList();
+    if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+    if (mgmtObject) mgmtObject->set_members(brokers);
+    haBroker.getBroker().getManagementAgent()->raiseEvent(
+        _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+    Mutex::ScopedLock l(lock);
+    mgmtObject = mo;
+    update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+    static const BrokerStatus TRANSITIONS[][2] = {
+        { STANDALONE, JOINING }, // Initialization of backup broker
+        { JOINING, CATCHUP },    // Connected to primary
+        { JOINING, RECOVERING }, // Chosen as initial primary.
+        { CATCHUP, READY },      // Caught up all queues, ready to take over.
+        { READY, RECOVERING },   // Chosen as new primary
+        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
+        { RECOVERING, ACTIVE }   // All expected backups are ready
+    };
+    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+    for (size_t i = 0; i < N; ++i) {
+        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+            return true;
+    }
+    return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+    BrokerStatus status = getStatus();
+    QPID_LOG(info, "Status change: "
+             << printable(status) << " -> " << printable(newStatus));
+    bool legal = checkTransition(status, newStatus);
+    if (!legal) {
+        haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+                                 << " -> " << printable(newStatus)));
+    }
+
+    Mutex::ScopedLock l(lock);
+    brokers[self].setStatus(newStatus);
+    if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+    update(l);
+}
+
+BrokerStatus Membership::getStatus() const  {
+    Mutex::ScopedLock l(lock);
+    return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const  {
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const  {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second;
 }
 
+// FIXME aconway 2013-01-23: move to .h?
 }} // namespace qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h Thu Feb 28 16:14:30 2013
@@ -24,45 +24,72 @@
 
 #include "BrokerInfo.h"
 #include "types.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/types/Variant.h"
 #include <boost/function.hpp>
 #include <set>
 #include <vector>
 #include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
 namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
 namespace ha {
+class HaBroker;
 
 /**
  * Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
  */
 class Membership
 {
   public:
-    Membership(const types::Uuid& self_) : self(self_) {}
+    Membership(const BrokerInfo& info, HaBroker&);
+
+    void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
 
-    void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+    void clear();               ///< Clear all but self.
     void add(const BrokerInfo& b);
     void remove(const types::Uuid& id);
     bool contains(const types::Uuid& id);
+
     /** Return IDs of all READY backups other than self */
     BrokerInfo::Set otherBackups() const;
 
     void assign(const types::Variant::List&);
     types::Variant::List asList() const;
 
-    bool get(const types::Uuid& id, BrokerInfo& result);
+    bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+    types::Uuid getSelf() const  { return self; }
+    BrokerInfo getInfo() const;
+    BrokerStatus getStatus() const;
+    void setStatus(BrokerStatus s);
 
   private:
-    types::Uuid self;
+    void update(sys::Mutex::ScopedLock&);
+    BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+    mutable sys::Mutex lock;
+    HaBroker& haBroker;
+    boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+    const types::Uuid self;
     BrokerInfo::Map brokers;
-    friend std::ostream& operator<<(std::ostream&, const Membership&);
 };
 
-std::ostream& operator<<(std::ostream&, const Membership&);
-
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_MEMBERSHIP_H*/



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org