You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/08/06 19:41:19 UTC
svn commit: r801740 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h ClusterPlugin.cpp ClusterSettings.h Cpg.cpp Cpg.h
OutputInterceptor.cpp PollerDispatch.cpp PollerDispatch.h Quorum_cman.cpp
Quorum_cman.h Quorum_null.h
Author: aconway
Date: Thu Aug 6 17:41:18 2009
New Revision: 801740
URL: http://svn.apache.org/viewvc?rev=801740&view=rev
Log:
Fix cman integration to exit immediately on loss of quorum.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 6 17:41:18 2009
@@ -190,6 +190,7 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ quorum(boost::bind(&Cluster::leave, this)),
initialized(false),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -214,7 +215,6 @@
// Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
- if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -226,10 +226,10 @@
}
void Cluster::initialize() {
+ if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " member " << self << " joining "
- << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -404,6 +404,7 @@
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
if (offer && error.isUnresolved()) {
@@ -510,7 +511,7 @@
const cpg_name */*group*/,
const cpg_address *current, int nCurrent,
const cpg_address *left, int nLeft,
- const cpg_address */*joined*/, int /*nJoined*/)
+ const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
if (state == INIT) { // First config change.
@@ -518,8 +519,11 @@
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "left: "));
+ QPID_LOG(notice, *this << " membership change: "
+ << AddrList(current, nCurrent) << "("
+ << AddrList(joined, nJoined, "joined: ")
+ << AddrList(left, nLeft, "left: ")
+ << ")");
std::string addresses;
for (const cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
@@ -833,9 +837,9 @@
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << "cluster:" << STATE[cluster.state];
+ o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o;
+ return o << ")";;
}
MemberId Cluster::getId() const {
@@ -846,14 +850,6 @@
return broker; // Immutable, no need to lock.
}
-void Cluster::checkQuorum() {
- if (!quorum.isQuorate()) {
- QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
- leave();
- throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
- }
-}
-
void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Aug 6 17:41:18 2009
@@ -101,8 +101,6 @@
broker::Broker& getBroker() const;
Multicaster& getMulticast() { return mcast; }
- void checkQuorum();
-
const ClusterSettings& getSettings() const { return settings; }
void deliverFrame(const EventFrame&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Aug 6 17:41:18 2009
@@ -72,7 +72,7 @@
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
- // FIXME aconway 2009-05-20: temporary
+ // TODO aconway 2009-05-20: temporary, remove
("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Thu Aug 6 17:41:18 2009
@@ -37,7 +37,7 @@
bool checkErrors;
ClusterSettings() : quorum(false), readMax(10),
- checkErrors(true) // FIXME aconway 2009-05-20: temporary
+ checkErrors(true) // TODO aconway 2009-05-20: remove this option.
{}
Url getUrl(uint16_t port) const {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug 6 17:41:18 2009
@@ -105,7 +105,7 @@
QPID_LOG(info, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
- int retries = 6;
+ int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
while (err == CPG_ERR_TRY_AGAIN && --retries) {
QPID_LOG(notice, "Re-trying CPG initialization.");
sys::sleep(5);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug 6 17:41:18 2009
@@ -20,7 +20,6 @@
*/
#include "qpid/Exception.h"
-#include "qpid/cluster/Dispatchable.h"
#include "qpid/cluster/types.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Aug 6 17:41:18 2009
@@ -46,7 +46,6 @@
void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
- parent.getCluster().checkQuorum();
{
sys::Mutex::ScopedLock l(lock);
next->send(f);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Thu Aug 6 17:41:18 2009
@@ -33,15 +33,18 @@
boost::bind(&PollerDispatch::dispatch, this, _1), // read
0, // write
boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect
- )
+ ),
+ started(false)
{}
PollerDispatch::~PollerDispatch() {
- dispatchHandle.stopWatch();
+ if (started)
+ dispatchHandle.stopWatch();
}
void PollerDispatch::start() {
dispatchHandle.startWatch(poller);
+ started = true;
}
// Entry point: called by IO to dispatch CPG events.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Thu Aug 6 17:41:18 2009
@@ -51,6 +51,7 @@
boost::shared_ptr<sys::Poller> poller;
boost::function<void()> onError;
sys::DispatchHandleRef dispatchHandle;
+ bool started;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Thu Aug 6 17:41:18 2009
@@ -18,28 +18,86 @@
* under the License.
*
*/
+
#include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/log/Statement.h"
#include "qpid/Options.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
namespace qpid {
namespace cluster {
-Quorum::Quorum() : enable(false), cman(0) {}
+namespace {
+
+boost::function<void()> errorFn;
-Quorum::~Quorum() { if (cman) cman_finish(cman); }
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
+ if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+ QPID_LOG(critical, "Lost contact with cluster quorum.");
+ if (errorFn) errorFn();
+ cman_stop_notification(handle);
+ }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) {
+ errorFn = err;
+}
+
+Quorum::~Quorum() {
+ dispatchHandle.reset();
+ if (cman) cman_finish(cman);
+}
-void Quorum::init() {
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+ poller = p;
enable = true;
+ QPID_LOG(debug, "Connecting to quorum service.");
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
if (!cman_is_quorate(cman)) {
QPID_LOG(notice, "Waiting for cluster quorum.");
while(!cman_is_quorate(cman)) sys::sleep(5);
}
+ int err = cman_start_notification(cman, cmanCallbackFn);
+ if (err != 0) throw ErrnoException("Can't register for cman notifications");
+ watch(getFd());
}
-bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
+void Quorum::watch(int fd) {
+ cmanFd = fd;
+ dispatchHandle.reset(
+ new sys::DispatchHandleRef(
+ sys::PosixIOHandle(cmanFd),
+ boost::bind(&Quorum::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Quorum::disconnect, this, _1) // disconnect
+ ));
+ dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+ int fd = cman_get_fd(cman);
+ if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+ return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+ try {
+ cman_dispatch(cman, CMAN_DISPATCH_ALL);
+ int fd = getFd();
+ if (fd != cmanFd) watch(fd);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+ errorFn();
+ }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+ QPID_LOG(critical, "Disconnected from quorum service");
+ errorFn();
+}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h Thu Aug 6 17:41:18 2009
@@ -22,26 +22,40 @@
*
*/
+#include <qpid/sys/DispatchHandle.h>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+
extern "C" {
#include <libcman.h>
}
namespace qpid {
-
-class Options;
+namespace sys {
+class Poller;
+}
namespace cluster {
+class Cluster;
class Quorum {
public:
- Quorum();
+ Quorum(boost::function<void ()> onError);
~Quorum();
- void init();
- bool isQuorate();
+ void start(boost::shared_ptr<sys::Poller>);
private:
+ void dispatch(sys::DispatchHandle&);
+ void disconnect(sys::DispatchHandle&);
+ int getFd();
+ void watch(int fd);
+
bool enable;
cman_handle_t cman;
+ int cmanFd;
+ std::auto_ptr<sys::DispatchHandleRef> dispatchHandle;
+ boost::shared_ptr<sys::Poller> poller;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h Thu Aug 6 17:41:18 2009
@@ -21,15 +21,20 @@
* under the License.
*
*/
+
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+
namespace qpid {
namespace cluster {
+class Cluster;
/** Null implementation of quorum. */
class Quorum {
public:
- void init() {}
- bool isQuorate() { return true; }
+ Quorum(boost::function<void ()>) {}
+ void start(boost::shared_ptr<sys::Poller>) {}
};
#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org