You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/04/23 17:51:34 UTC

svn commit: r1329300 - in /qpid/trunk/qpid/cpp/src: qpid/ha/Backup.cpp qpid/ha/BrokerReplicator.cpp tests/ha_tests.py

Author: aconway
Date: Mon Apr 23 15:51:33 2012
New Revision: 1329300

URL: http://svn.apache.org/viewvc?rev=1329300&view=rev
Log:
QPID-3603: Install HA connection excluder at the beginning of initialization.

Previosly excluder was being installed late allowing connections to a
backup and diverting backups from the real primary.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1329300&r1=1329299&r2=1329300&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Mon Apr 23 15:51:33 2012
@@ -47,6 +47,8 @@ using std::string;
 Backup::Backup(HaBroker& hb, const Settings& s) :
     haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
 {
+    // Exclude client connections before starting the link to avoid self-connection.
+    broker.getConnectionObservers().add(excluder);
     // Empty brokerUrl means delay initialization until setUrl() is called.
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
 }
@@ -62,12 +64,18 @@ void Backup::initialize(const Url& url) 
         settings.mechanism, settings.username, settings.password);
     link = result.first;
     link->setUrl(url);
-
     replicator.reset(new BrokerReplicator(haBroker, link));
     broker.getExchanges().registerExchange(replicator);
-    broker.getConnectionObservers().add(excluder);
 }
 
+Backup::~Backup() {
+    if (link) link->close();
+    if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+    replicator.reset();
+    broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+
 void Backup::setBrokerUrl(const Url& url) {
     // Ignore empty URLs seen during start-up for some tests.
     if (url.empty()) return;
@@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url
     }
 }
 
-Backup::~Backup() {
-    if (link) link->close();
-    if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
-    broker.getConnectionObservers().remove(excluder); // This allows client connections.
-}
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1329300&r1=1329299&r2=1329300&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Apr 23 15:51:33 2012
@@ -190,8 +190,6 @@ BrokerReplicator::BrokerReplicator(HaBro
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
       haBroker(hb), broker(hb.getBroker()), link(l)
 {
-    QPID_LOG(info, "HA: Backup replicating from " <<
-             link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
     broker.getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -230,7 +228,7 @@ void BrokerReplicator::initializeBridge(
     sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+    QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName);
 }
 
 void BrokerReplicator::route(Deliverable& msg) {
@@ -246,6 +244,7 @@ void BrokerReplicator::route(Deliverable
         if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
+                QPID_LOG(trace, "HA: Backup received event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -257,8 +256,10 @@ void BrokerReplicator::route(Deliverable
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
-                string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
-                Variant::Map& values = i->asMap()[VALUES].asMap();
+                Variant::Map& map = i->asMap();
+                QPID_LOG(trace, "HA: Backup received event: " << map);
+                string type = map[SCHEMA_ID].asMap()[CLASS_NAME];
+                Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
                 if      (type == QUEUE) doResponseQueue(values);
@@ -268,14 +269,13 @@ void BrokerReplicator::route(Deliverable
             }
         }
     } catch (const std::exception& e) {
-        QPID_LOG(critical, "HA: Backup configuration replication failed: " << e.what()
+        QPID_LOG(critical, "HA: Backup configuration failed: " << e.what()
                  << ": while handling: " << list);
         throw;
     }
 }
 
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue declare event " << values);
     string name = values[QNAME].asString();
     Variant::Map argsMap = asMapVoid(values[ARGS]);
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
@@ -292,26 +292,27 @@ void BrokerReplicator::doEventQueueDecla
                 values[USER].asString(),
                 values[RHOST].asString());
         if (result.second) {
-            // FIXME aconway 2011-11-22: should delete old queue and
-            // re-create from event.
-            // Events are always up to date, whereas responses may be
-            // out of date.
+            QPID_LOG(debug, "HA: Backup queue declare event: " << name);
             startQueueReplicator(result.first);
         } else {
             // FIXME aconway 2011-12-02: what's the right way to handle this?
-            QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+            // Should we delete the old & re-create form the event? Responses
+            // may be old but events are always up-to-date.
+            QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name);
         }
     }
 }
 
 void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue delete event " << values);
     // The remote queue has already been deleted so replicator
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+    if (!queue) {
+        QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name);
+    } else if (!replicateLevel(queue->getSettings())) {
+        QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name);
+    } else {
         string rname = QueueReplicator::replicatorName(name);
         boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
         boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
@@ -320,11 +321,11 @@ void BrokerReplicator::doEventQueueDelet
         // actually be destroyed, deleting the exhange
         broker.getExchanges().destroy(rname);
         broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+        QPID_LOG(debug, "HA: Backup queue delete event: " << name);
     }
 }
 
 void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange declare event " << values);
     Variant::Map argsMap(asMapVoid(values[ARGS]));
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
@@ -339,32 +340,32 @@ void BrokerReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString()).second)
         {
-                    QPID_LOG(debug, "HA: Backup created exchange: " << name);
+            QPID_LOG(debug, "HA: Backup exchange declare event: " << name);
         } else {
-            // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+            // FIXME aconway 2011-11-22: should delete pre-existing exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+            QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name);
         }
     }
 }
 
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange delete event " << values);
     string name = values[EXNAME].asString();
-    try {
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
-        if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
-            broker.deleteExchange(
-                name,
-                values[USER].asString(),
-                values[RHOST].asString());
-        }
-    } catch (const framing::NotFoundException&) {}
+    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+    if (!exchange) {
+        QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name);
+    } else if (!replicateLevel(exchange->getArgs())) {
+        QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name);
+    } else {
+        QPID_LOG(debug, "HA: Backup exchange delete event:" << name);
+        broker.deleteExchange(
+            name,
+            values[USER].asString(),
+            values[RHOST].asString());
+    }
 }
 
 void BrokerReplicator::doEventBind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup bind event " << values);
     boost::shared_ptr<Exchange> exchange =
         broker.getExchanges().find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
@@ -377,15 +378,14 @@ void BrokerReplicator::doEventBind(Varia
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+        exchange->bind(queue, key, &args);
+        QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->bind(queue, key, &args);
     }
 }
 
 void BrokerReplicator::doEventUnbind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup unbind event " << values);
     boost::shared_ptr<Exchange> exchange =
         broker.getExchanges().find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
@@ -398,15 +398,14 @@ void BrokerReplicator::doEventUnbind(Var
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+        exchange->unbind(queue, key, &args);
+        QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->unbind(queue, key, &args);
     }
 }
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue response " << values);
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
@@ -423,16 +422,16 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
+        QPID_LOG(debug, "HA: Backup queue response: " << name);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+        QPID_LOG(warning, "HA: Backup queue response, already exists: " << name);
     }
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange response " << values);
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
@@ -446,9 +445,9 @@ void BrokerReplicator::doResponseExchang
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second)
     {
-        QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME].asString());
+        QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString());
     } else {
-        QPID_LOG(warning, "HA: Backup catch-up exchange already exists:  " <<
+        QPID_LOG(warning, "HA: Backup exchange query, already exists: " <<
                  values[QNAME].asString());
     }
 }
@@ -475,7 +474,6 @@ const std::string QUEUE_REF("queueRef");
 } // namespace
 
 void BrokerReplicator::doResponseBind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup bind response " << values);
     std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
     std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
     boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
@@ -489,7 +487,7 @@ void BrokerReplicator::doResponseBind(Va
         amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+        QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1329300&r1=1329299&r2=1329300&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 23 15:51:33 2012
@@ -34,7 +34,7 @@ class HaBroker(Broker):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=info+", "--log-enable=trace+:ha::", # FIXME aconway 2012-04-18: trace
+                 "--log-enable=info+", "--log-enable=debug+:ha::",
                  # FIXME aconway 2012-02-13: workaround slow link failover.
                  "--link-maintenace-interval=0.1",
                  "--ha-cluster=%s"%ha_cluster]
@@ -325,7 +325,7 @@ class ReplicationTests(BrokerTest):
         """Verify that a backup broker fails over and recovers queue state"""
         brokers = HaCluster(self, 3)
         brokers[0].connect().session().sender("q;{create:always}").send("a")
-        for b in brokers[1:]: b.assert_browse_backup("q", ["a"])
+        for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
         brokers[0].expect = EXPECT_EXIT_FAIL
         brokers.kill(0)
         brokers[1].connect().session().sender("q").send("b")



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org