You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/08/07 17:19:37 UTC

svn commit: r1370308 - in /qpid/branches/0.18/qpid/cpp/src: qpid/ha/ tests/

Author: aconway
Date: Tue Aug  7 15:19:36 2012
New Revision: 1370308

URL: http://svn.apache.org/viewvc?rev=1370308&view=rev
Log:
QPID-4191: HA removing self address breaks if a VIP is used.

Pre this patch the HA broker removed its own address from the set of cluster
addresses to form the set of failover addresses. The goal was avoid useless
self-connection attempts. However this was broken with a Virtual IP address
where a single address is used for the entire cluster.

The remove-self is not essential, self-connection attempts are prevented
elsewhere. Backup brokers will be prevented from connecting to self by the same
connection-observer as normal clients, and this patch addes self-connection
checks ins

This patch
- removes the code to remove self-addresses
- adds self-connection checks in ConnectionObserver
- adds & reorders some log statements & comments for greater clarity.

Modified:
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.cpp Tue Aug  7 15:19:36 2012
@@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Setti
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
 }
 
-bool Backup::isSelf(const Address& a) const {
-    return sys::SystemInfo::isLocalHost(a.host) &&
-        a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
-    Url url;
-    for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
-        if (!isSelf(*i)) url.push_back(*i);
-    if (url.empty())
-        throw Url::Invalid(logPrefix+"Failover URL is empty");
-    QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
-    return url;
-}
-
 void Backup::initialize(const Url& brokers) {
     if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
     QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
-    Url url = removeSelf(brokers);
-    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+    string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
     types::Uuid uuid(true);
     // Declare the link
     std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
         broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
-        url[0].host, url[0].port, protocol,
+        brokers[0].host, brokers[0].port, protocol,
         false,                  // durable
         settings.mechanism, settings.username, settings.password,
         false);               // no amq.failover - don't want to use client URL.
@@ -90,7 +70,7 @@ void Backup::initialize(const Url& broke
         replicator->initialize();
         broker.getExchanges().registerExchange(replicator);
     }
-    link->setUrl(url);          // Outside the lock, once set link doesn't change.
+    link->setUrl(brokers);          // Outside the lock, once set link doesn't change.
 }
 
 Backup::~Backup() {
@@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url
         sys::Mutex::ScopedLock l(lock);
         linkSet = link;
     }
-    if (linkSet) {
-        QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
-        link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
-    }
+    if (linkSet)
+        link->setUrl(url);      // Outside lock, once set link doesn't change
     else
         initialize(url);        // Deferred initialization
 }

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.h?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/Backup.h Tue Aug  7 15:19:36 2012
@@ -53,8 +53,6 @@ class Backup
     void setStatus(BrokerStatus);
 
   private:
-    bool isSelf(const Address& a) const;
-    Url removeSelf(const Url&) const;
     void initialize(const Url&);
     std::string logPrefix;
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h Tue Aug  7 15:19:36 2012
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public 
 {
   public:
     void opened(broker::Connection& connection) {
-        QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+        QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
         connection.abort();
     }
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Aug  7 15:19:36 2012
@@ -472,7 +472,8 @@ void BrokerReplicator::doResponseExchang
     boost::shared_ptr<Exchange> exchange = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));
-    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+    // It is normal for the exchange to already exist if we are failing over.
+    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
 }
 
 namespace {

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Tue Aug  7 15:19:36 2012
@@ -32,7 +32,7 @@ namespace ha {
 ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
     : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
 
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
     framing::FieldTable ft;
     if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
         info = BrokerInfo(ft);
@@ -51,29 +51,43 @@ ConnectionObserver::ObserverPtr Connecti
     return observer;
 }
 
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+    BrokerInfo info;
+    return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
 void ConnectionObserver::opened(broker::Connection& connection) {
-    if (connection.isLink()) return; // Allow outgoing links.
-    if (connection.getClientProperties().isSet(ADMIN_TAG)) {
-        QPID_LOG(debug, logPrefix << "Allowing admin connection: "
-                 << connection.getMgmtId());
-        return;                 // No need to call observer, always allow admins.
-    }
-    BrokerInfo info;            // Avoid self connections.
-    if (getBrokerInfo(connection, info)) {
-        if (info.getSystemId() == self) {
-            QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
+    try {
+        if (connection.isLink()) return; // Allow outgoing links.
+        if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+            QPID_LOG(debug, logPrefix << "Accepted admin connection: "
+                     << connection.getMgmtId());
+            return;                 // No need to call observer, always allow admins.
+        }
+        if (isSelf(connection)) { // Reject self connections
+            QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
             connection.abort();
+            return;
         }
-
+        ObserverPtr o(getObserver());
+        if (o) o->opened(connection);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Open error: " << e.what());
+        throw;
     }
-    ObserverPtr o(getObserver());
-    if (o) o->opened(connection);
 }
 
 void ConnectionObserver::closed(broker::Connection& connection) {
-    BrokerInfo info;
-    ObserverPtr o(getObserver());
-    if (o) o->closed(connection);
+    if (isSelf(connection)) return; // Ignore closing of self connections.
+    try {
+        ObserverPtr o(getObserver());
+        if (o) o->closed(connection);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Close error: " << e.what());
+        throw;
+    }
 }
 
 const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/ConnectionObserver.h Tue Aug  7 15:19:36 2012
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker
     static const std::string ADMIN_TAG;
     static const std::string BACKUP_TAG;
 
-    static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+    static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
 
     ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
 
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker
     void closed(broker::Connection& connection);
 
   private:
+    bool isSelf(const broker::Connection&);
+
     sys::Mutex lock;
     HaBroker& haBroker;
     std::string logPrefix;

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Aug  7 15:19:36 2012
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
 
     // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
     brokerInfo = BrokerInfo(
-        broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+        broker.getSystem()->getNodeName(),
+        broker.getPort(broker::Broker::TCP_TRANSPORT),
+        systemId);
+
+    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
     if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
 
 
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
     // NOTE: lock is not needed in a constructor, but create one
     // to pass to functions that have a ScopedLock parameter.
     Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& u
     if (url.empty()) throw Url::Invalid("HA broker URL is empty");
     brokerUrl = url;
     mgmtObject->set_brokersUrl(brokerUrl.str());
+    QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
     if (backup.get()) backup->setBrokerUrl(brokerUrl);
     // Updating broker URL also updates defaulted client URL:
     if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::Scop
 }
 
 void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+    QPID_LOG(info, logPrefix << "Membership changed: " <<  membership);
     Variant::List brokers = membership.asList();
     mgmtObject->set_members(brokers);
     broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const Bro
 void HaBroker::addBroker(const BrokerInfo& b) {
     Mutex::ScopedLock l(lock);
     membership.add(b);
-    QPID_LOG(debug, logPrefix << "Membership add: " <<  b << " now: " << membership);
+    QPID_LOG(debug, logPrefix << "Membership add: " <<  b);
     membershipUpdated(l);
 }
 
 void HaBroker::removeBroker(const Uuid& id) {
     Mutex::ScopedLock l(lock);
     membership.remove(id);
-    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id << " now: " << membership);
+    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id);
     membershipUpdated(l);
 }
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.h Tue Aug  7 15:19:36 2012
@@ -97,6 +97,8 @@ class HaBroker : public management::Mana
     void addBroker(const BrokerInfo& b);       // Add a broker to the membership.
     void removeBroker(const types::Uuid& id);  // Remove a broker from membership.
 
+    types::Uuid getSystemId() const { return systemId; }
+
   private:
     void setClientUrl(const Url&);
     void setBrokerUrl(const Url&);

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp Tue Aug  7 15:19:36 2012
@@ -190,11 +190,13 @@ void Primary::queueDestroy(const QueuePt
 }
 
 void Primary::opened(broker::Connection& connection) {
+    QPID_LOG(critical, "FIXME opened " << connection.getMgmtId());
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
+            QPID_LOG(debug, logPrefix << "New backup connected: " << info);
             boost::shared_ptr<RemoteBackup> backup(
                 new RemoteBackup(info, haBroker.getReplicationTest(), true));
             {
@@ -203,7 +205,6 @@ void Primary::opened(broker::Connection&
                 backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
             }
             backups[info.getSystemId()] = backup;
-            QPID_LOG(debug, logPrefix << "New backup connected: " << info);
         }
         else {
             QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
@@ -219,6 +220,12 @@ void Primary::opened(broker::Connection&
 }
 
 void Primary::closed(broker::Connection& connection) {
+    // NOTE: It is possible for a backup connection to be rejected while we are
+    // a backup, but closed() is called after we have become primary.
+    //
+    // For this reason we do not remove from the backups map here, the backups
+    // map holds all the backups we know about whether connected or not.
+    //
     Mutex::ScopedLock l(lock);
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -227,12 +234,6 @@ void Primary::closed(broker::Connection&
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i != backups.end()) i->second->setConnected(false);
     }
-    // NOTE: we do not remove from the backups map here, the backups map holds
-    // all the backups we know about whether connected or not.
-    //
-    // It is possible for a backup connection to be rejected while we are a backup,
-    // but the closed is seen after we have become primary. Removing the entry
-    // from backups in this case would be incorrect.
 }
 
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Tue Aug  7 15:19:36 2012
@@ -32,7 +32,7 @@ using sys::Mutex;
 using boost::bind;
 
 RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
-    logPrefix("Primary remote backup "+info.getLogId()+": "),
+    logPrefix("Primary: Remote backup "+info.getLogId()+": "),
     brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
 {}
 

Modified: qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py?rev=1370308&r1=1370307&r2=1370308&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py Tue Aug  7 15:19:36 2012
@@ -63,7 +63,6 @@ class HaBroker(Broker):
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
                  "--log-enable=debug+:ha::",
-                 "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: 
                  # FIXME aconway 2012-02-13: workaround slow link failover.
                  "--link-maintenace-interval=0.1",
                  "--ha-cluster=%s"%ha_cluster]
@@ -112,9 +111,11 @@ class HaBroker(Broker):
     def wait_status(self, status):
         def try_get_status():
             # Ignore ConnectionError, the broker may not be up yet.
-            try: return self.ha_status() == status;
+            try:
+                self._status = self.ha_status()
+                return self._status == status;
             except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+        assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
 
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
@@ -761,7 +762,7 @@ acl deny all all
         s1.sender("ex").send("foo");
         self.assertEqual(s1.receiver("q").fetch().content, "foo")
 
-    def test_alterante_exchange(self):
+    def test_alternate_exchange(self):
         """Verify that alternate-exchange on exchanges and queues is propagated
         to new members of a cluster. """
         cluster = HaCluster(self, 2)
@@ -964,7 +965,7 @@ class RecoveryTests(BrokerTest):
         """
         cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
         cluster[0].wait_status("active") # Primary ready
-        for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+        for b in cluster[1:3]: b.wait_status("ready") # Backups ready
         for i in [0,1]: cluster.kill(i, False)
         cluster[2].promote()    # New primary, backups will be 1 and 2
         cluster[2].wait_status("recovering")



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org