You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/04/29 17:57:59 UTC
svn commit: r1477165 - in /qpid/trunk/qpid/cpp/src: qpid/ha/BrokerInfo.cpp
qpid/ha/ConnectionObserver.cpp qpid/ha/ConnectionObserver.h
qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/StatusCheck.cpp
tests/ha_tests.py
Author: aconway
Date: Mon Apr 29 15:57:59 2013
New Revision: 1477165
URL: http://svn.apache.org/r1477165
Log:
QPID-4787: HA brokers find self-address in brokers_url.
HA brokers need to know their own addresses, but it is not safe to simply use
local hosts name and Broker::getPort() since the broker may be listening on
multiple addresses. The solution is to have brokers check the ha-rokers-url for
their own address while doing the initial status check of the cluster.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon Apr 29 15:57:59 2013
@@ -89,8 +89,12 @@ void BrokerInfo::assign(const Variant::M
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- return o << b.getHostName() << ":" << b.getPort() << "("
- << printable(b.getStatus()) << ")";
+ o << "FIXME:";
+ o << b.getSystemId().str().substr(0,7);
+ if (!b.getHostName().empty())
+ o << "@" << b.getHostName() << ":" << b.getPort();
+ o << "(" << printable(b.getStatus()) << ")";
+ return o;
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Apr 29 15:57:59 2013
@@ -22,6 +22,7 @@
#include "ConnectionObserver.h"
#include "BrokerInfo.h"
#include "HaBroker.h"
+#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -41,6 +42,17 @@ bool ConnectionObserver::getBrokerInfo(c
return false;
}
+bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
+ Url url;
+ url.parseNoThrow(
+ connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
+ if (!url.empty()) {
+ addr = url[0];
+ return true;
+ }
+ return false;
+}
+
void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
{
sys::Mutex::ScopedLock l(lock);
@@ -60,17 +72,20 @@ bool ConnectionObserver::isSelf(const br
void ConnectionObserver::opened(broker::Connection& connection) {
try {
+ if (isSelf(connection)) { // Reject self connections
+ // Set my own address if there is an address header.
+ Address addr;
+ if (getAddress(connection, addr)) haBroker.setAddress(addr);
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ return;
+ }
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().isSet(ADMIN_TAG)) {
QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
- if (isSelf(connection)) { // Reject self connections
- QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
- connection.abort();
- return;
- }
ObserverPtr o(getObserver());
if (o) o->opened(connection);
}
@@ -94,5 +109,6 @@ void ConnectionObserver::closed(broker::
const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup";
+const std::string ConnectionObserver::ADDRESS_TAG="qpid.ha-address";
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h Mon Apr 29 15:57:59 2013
@@ -29,6 +29,8 @@
#include "boost/shared_ptr.hpp"
namespace qpid {
+class Address;
+
namespace ha {
class BrokerInfo;
class HaBroker;
@@ -50,8 +52,10 @@ class ConnectionObserver : public broker
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
+ static const std::string ADDRESS_TAG;
- static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo&);
+ static bool getAddress(const broker::Connection& connection, Address&);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Apr 29 15:57:59 2013
@@ -84,15 +84,7 @@ bool isNone(const std::string& x) { retu
// Called in Plugin::initialize
void HaBroker::initialize() {
- // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- membership.add(
- BrokerInfo(
- membership.getSelf(),
- settings.cluster ? JOINING : membership.getStatus(),
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT)
- )
- );
+ if (settings.cluster) membership.setStatus(JOINING);
QPID_LOG(notice, "Initializing: " << membership.getInfo());
// Set up the management object.
@@ -207,4 +199,11 @@ BrokerStatus HaBroker::getStatus() const
return membership.getStatus();
}
+void HaBroker::setAddress(const Address& a) {
+ QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+ BrokerInfo b(membership.getSelf(), membership.getStatus(), a.host, a.port);
+ membership.add(b);
+}
+
+
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Apr 29 15:57:59 2013
@@ -90,6 +90,8 @@ class HaBroker : public management::Mana
Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
+ void setAddress(const Address&); // set self address from a self-connection
+
private:
void setPublicUrl(const Url&);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Mon Apr 29 15:57:59 2013
@@ -19,6 +19,7 @@
*
*/
#include "StatusCheck.h"
+#include "ConnectionObserver.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
@@ -55,7 +56,9 @@ void StatusCheckThread::run() {
try {
Variant::Map options, clientProperties;
clientProperties = brokerInfo.asMap(); // Detect self connections.
- clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
options["client-properties"] = clientProperties;
options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1477165&r1=1477164&r2=1477165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 29 15:57:59 2013
@@ -545,26 +545,25 @@ class ReplicationTests(HaBrokerTest):
"""Check that broker information is correctly published via management"""
cluster = HaCluster(self, 3)
+ def ha_broker(broker):
+ ha_broker = broker.agent().getHaBroker();
+ ha_broker.update()
+ return ha_broker
+
for broker in cluster: # Make sure HA system-id matches broker's
- qmf = broker.agent().getHaBroker()
- self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+ self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef))
- cluster_ports = map(lambda b: b.port(), cluster)
- cluster_ports.sort()
- def ports(qmf):
- qmf.update()
- return sorted(map(lambda b: b["port"], qmf.members))
# Check that all brokers have the same membership as the cluster
- for broker in cluster:
- qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+ def check_ids(broker):
+ cluster_ids = set([ ha_broker(b).systemId for b in cluster])
+ broker_ids = set([m["system-id"] for m in ha_broker(broker).members])
+ assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker)
+
+ for broker in cluster: check_ids(broker)
+
# Add a new broker, check it is updated everywhere
b = cluster.start()
- cluster_ports.append(b.port())
- cluster_ports.sort()
- for broker in cluster:
- qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ for broker in cluster: check_ids(broker)
def test_auth(self):
"""Verify that authentication does not interfere with replication."""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org