You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/04/29 17:57:59 UTC

svn commit: r1477165 - in /qpid/trunk/qpid/cpp/src: qpid/ha/BrokerInfo.cpp qpid/ha/ConnectionObserver.cpp qpid/ha/ConnectionObserver.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/StatusCheck.cpp tests/ha_tests.py

Author: aconway
Date: Mon Apr 29 15:57:59 2013
New Revision: 1477165

URL: http://svn.apache.org/r1477165
Log:
QPID-4787: HA brokers find self-address in brokers_url.

HA brokers need to know their own addresses, but it is not safe to simply use
local hosts name and Broker::getPort() since the broker may be listening on
multiple addresses. The solution is to have brokers check the ha-rokers-url for
their own address while doing the initial status check of the cluster.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon Apr 29 15:57:59 2013
@@ -89,8 +89,12 @@ void BrokerInfo::assign(const Variant::M
 }
 
 std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
-    return o << b.getHostName() << ":" << b.getPort() << "("
-             << printable(b.getStatus()) << ")";
+    o << "FIXME:";
+    o  << b.getSystemId().str().substr(0,7);
+    if (!b.getHostName().empty())
+        o << "@" << b.getHostName() << ":" << b.getPort();
+    o << "(" << printable(b.getStatus()) << ")";
+    return o;
 }
 
 std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Apr 29 15:57:59 2013
@@ -22,6 +22,7 @@
 #include "ConnectionObserver.h"
 #include "BrokerInfo.h"
 #include "HaBroker.h"
+#include "qpid/Url.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
@@ -41,6 +42,17 @@ bool ConnectionObserver::getBrokerInfo(c
     return false;
 }
 
+bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
+    Url url;
+    url.parseNoThrow(
+        connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
+    if (!url.empty()) {
+        addr = url[0];
+        return true;
+    }
+    return false;
+}
+
 void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
 {
     sys::Mutex::ScopedLock l(lock);
@@ -60,17 +72,20 @@ bool ConnectionObserver::isSelf(const br
 
 void ConnectionObserver::opened(broker::Connection& connection) {
     try {
+        if (isSelf(connection)) { // Reject self connections
+            // Set my own address if there is an address header.
+            Address addr;
+            if (getAddress(connection, addr)) haBroker.setAddress(addr);
+            QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+            connection.abort();
+            return;
+        }
         if (connection.isLink()) return; // Allow outgoing links.
         if (connection.getClientProperties().isSet(ADMIN_TAG)) {
             QPID_LOG(debug, logPrefix << "Accepted admin connection: "
                      << connection.getMgmtId());
             return;                 // No need to call observer, always allow admins.
         }
-        if (isSelf(connection)) { // Reject self connections
-            QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
-            connection.abort();
-            return;
-        }
         ObserverPtr o(getObserver());
         if (o) o->opened(connection);
     }
@@ -94,5 +109,6 @@ void ConnectionObserver::closed(broker::
 
 const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
 const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup";
+const std::string ConnectionObserver::ADDRESS_TAG="qpid.ha-address";
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h Mon Apr 29 15:57:59 2013
@@ -29,6 +29,8 @@
 #include "boost/shared_ptr.hpp"
 
 namespace qpid {
+class Address;
+
 namespace ha {
 class BrokerInfo;
 class HaBroker;
@@ -50,8 +52,10 @@ class ConnectionObserver : public broker
 
     static const std::string ADMIN_TAG;
     static const std::string BACKUP_TAG;
+    static const std::string ADDRESS_TAG;
 
-    static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
+    static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo&);
+    static bool getAddress(const broker::Connection& connection, Address&);
 
     ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Apr 29 15:57:59 2013
@@ -84,15 +84,7 @@ bool isNone(const std::string& x) { retu
 
 // Called in Plugin::initialize
 void HaBroker::initialize() {
-    // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
-    membership.add(
-        BrokerInfo(
-            membership.getSelf(),
-            settings.cluster ? JOINING : membership.getStatus(),
-            broker.getSystem()->getNodeName(),
-            broker.getPort(broker::Broker::TCP_TRANSPORT)
-        )
-    );
+    if (settings.cluster) membership.setStatus(JOINING);
     QPID_LOG(notice, "Initializing: " << membership.getInfo());
 
     // Set up the management object.
@@ -207,4 +199,11 @@ BrokerStatus HaBroker::getStatus() const
     return membership.getStatus();
 }
 
+void HaBroker::setAddress(const Address& a) {
+    QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+    BrokerInfo b(membership.getSelf(), membership.getStatus(), a.host, a.port);
+    membership.add(b);
+}
+
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Apr 29 15:57:59 2013
@@ -90,6 +90,8 @@ class HaBroker : public management::Mana
     Membership& getMembership() { return membership; }
     types::Uuid getSystemId() const { return systemId; }
 
+    void setAddress(const Address&); // set self address from a self-connection
+
   private:
 
     void setPublicUrl(const Url&);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Mon Apr 29 15:57:59 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "StatusCheck.h"
+#include "ConnectionObserver.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
@@ -55,7 +56,9 @@ void StatusCheckThread::run() {
     try {
         Variant::Map options, clientProperties;
         clientProperties = brokerInfo.asMap(); // Detect self connections.
-        clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+        clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+        clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
+        clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
 
         options["client-properties"] = clientProperties;
         options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 29 15:57:59 2013
@@ -545,26 +545,25 @@ class ReplicationTests(HaBrokerTest):
         """Check that broker information is correctly published via management"""
         cluster = HaCluster(self, 3)
 
+        def ha_broker(broker):
+            ha_broker = broker.agent().getHaBroker();
+            ha_broker.update()
+            return ha_broker
+
         for broker in cluster:  # Make sure HA system-id matches broker's
-            qmf = broker.agent().getHaBroker()
-            self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+            self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef))
 
-        cluster_ports = map(lambda b: b.port(), cluster)
-        cluster_ports.sort()
-        def ports(qmf):
-            qmf.update()
-            return sorted(map(lambda b: b["port"], qmf.members))
         # Check that all brokers have the same membership as the cluster
-        for broker in cluster:
-            qmf = broker.agent().getHaBroker()
-            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+        def check_ids(broker):
+            cluster_ids = set([ ha_broker(b).systemId for b in cluster])
+            broker_ids = set([m["system-id"] for m in  ha_broker(broker).members])
+            assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker)
+
+        for broker in cluster: check_ids(broker)
+
         # Add a new broker, check it is updated everywhere
         b = cluster.start()
-        cluster_ports.append(b.port())
-        cluster_ports.sort()
-        for broker in cluster:
-            qmf = broker.agent().getHaBroker()
-            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+        for broker in cluster: check_ids(broker)
 
     def test_auth(self):
         """Verify that authentication does not interfere with replication."""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org