You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/01/15 20:28:45 UTC
svn commit: r1558538 - in /qpid/trunk/qpid/cpp/src/qpid/ha:
BrokerReplicator.cpp BrokerReplicator.h Membership.cpp StatusCheck.cpp
StatusCheck.h
Author: aconway
Date: Wed Jan 15 19:28:44 2014
New Revision: 1558538
URL: http://svn.apache.org/r1558538
Log:
QPID-5482: HA Backup becomes useless if a connection-forced error is raised.
Backup will now shut-down with critical error if it receives a connection-forced
while trying to connect to the primary so the problem is obvious.
ha/StatusCheck: don't use the HaBroker outside the StatusCheck constructor
as it may be deleted. Avoids core dump if broker shuts down early while
status check is ongoing.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1558538&r1=1558537&r2=1558538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Jan 15 19:28:44 2014
@@ -863,6 +863,23 @@ bool BrokerReplicator::unbind(boost::sha
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
bool BrokerReplicator::hasBindings() { return false; }
+// ConnectionObserver methods
+void BrokerReplicator::connection(broker::Connection&) {}
+void BrokerReplicator::opened(broker::Connection&) {}
+
+void BrokerReplicator::closed(broker::Connection& c) {
+ if (link && &c == connect) disconnected();
+}
+
+void BrokerReplicator::forced(broker::Connection& c, const std::string& message) {
+ if (link && &c == link->getConnection()) {
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Connection forced, cluster may be misconfigured: "
+ << message));
+ }
+ closed(c);
+}
+
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1558538&r1=1558537&r2=1558538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed Jan 15 19:28:44 2014
@@ -90,10 +90,10 @@ class BrokerReplicator : public broker::
bool hasBindings();
// ConnectionObserver methods
- void connection(broker::Connection&) {}
- void opened(broker::Connection&) {}
- void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); }
- void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); }
+ void connection(broker::Connection&);
+ void opened(broker::Connection&);
+ void closed(broker::Connection&);
+ void forced(broker::Connection&, const std::string& /*message*/);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1558538&r1=1558537&r2=1558538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Wed Jan 15 19:28:44 2014
@@ -146,7 +146,7 @@ bool checkTransition(BrokerStatus from,
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
-// Update managment and send update event.
+ // Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
if (mgmtObject) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1558538&r1=1558537&r2=1558538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Wed Jan 15 19:28:44 2014
@@ -57,20 +57,19 @@ void StatusCheckThread::run() {
try {
// Check for self connections
Variant::Map options, clientProperties;
- clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.brokerInfo.asMap();
// Set connection options
- Settings settings(statusCheck.haBroker.getSettings());
+ const Settings& settings = statusCheck.settings;
if (settings.username.size()) options["username"] = settings.username;
if (settings.password.size()) options["password"] = settings.password;
if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
- options["heartbeat"] = heartbeat/sys::TIME_SEC;
- c = Connection(url.str(), options);
+ options["heartbeat"] = statusCheck.heartbeat/sys::TIME_SEC;
+ c = Connection(url.str(), options);
c.open();
Session session = c.createSession();
messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
@@ -88,7 +87,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
+ messaging::Duration timeout(statusCheck.heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -103,17 +102,18 @@ void StatusCheckThread::run() {
}
}
else
- QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
- } catch(const exception& error) {
- // Its not an error to fail to connect to self.
- if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
- QPID_LOG(warning, logPrefix << error.what());
- }
+ QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
+ } catch(...) {}
try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
+// Note: Don't use hb outside of the constructor, it may be deleted.
+StatusCheck::StatusCheck(HaBroker& hb) :
+ promote(true),
+ settings(hb.getSettings()),
+ heartbeat(hb.getBroker().getOptions().linkHeartbeatInterval),
+ brokerInfo(hb.getBrokerInfo())
{}
StatusCheck::~StatusCheck() {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1558538&r1=1558537&r2=1558538&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Wed Jan 15 19:28:44 2014
@@ -65,7 +65,9 @@ class StatusCheck
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- HaBroker& haBroker;
+ const Settings settings;
+ const sys::Duration heartbeat;
+ const BrokerInfo brokerInfo;
friend class StatusCheckThread;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org