You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/26 18:37:16 UTC
svn commit: r720924 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/sys/posix/Shlib.cpp
tests/Makefile.am tests/cluster_test.cpp tests/perfdist
Author: aconway
Date: Wed Nov 26 09:37:16 2008
New Revision: 720924
URL: http://svn.apache.org/viewvc?rev=720924&view=rev
Log:
Cluster.cpp: Fixed last-node-standing logic, better logging.
Shlib.cpp: added file name to errors messages.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 26 09:37:16 2008
@@ -102,7 +102,8 @@
mcastId(0),
mgmtObject(0),
state(INIT),
- lastSize(1)
+ lastSize(0),
+ lastBroker(false)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -115,7 +116,7 @@
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name.str());
+ QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -198,9 +199,8 @@
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
QPID_LOG(notice, *this << " leaving cluster " << name.str());
-
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
if (!deliverQueue.isStopped()) deliverQueue.stop();
try { cpg.leave(name); }
catch (const std::exception& e) {
@@ -258,47 +258,48 @@
deliverQueue.push(e); // Otherwise enqueue for processing.
}
+// Entry point: called when deliverQueue has events to process.
void Cluster::delivered(const Event& e) {
- Lock l(lock);
- delivered(e,l);
+ try {
+ Lock l(lock);
+ delivered(e,l);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+ leave();
+ }
+
}
void Cluster::delivered(const Event& e, Lock& l) {
- try {
- Buffer buf(e);
- AMQFrame frame;
- if (e.isCluster()) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- ClusterDispatcher dispatch(*this, e.getMemberId(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- }
- else { // e.isConnection()
- if (state == NEWBIE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- }
- else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (!connection) return;
- if (e.getType() == CONTROL) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- connection->delivered(frame);
- }
- }
- else {
- QPID_LOG(trace, *this << " DLVR: " << e);
- connection->deliverBuffer(buf);
+ Buffer buf(e);
+ AMQFrame frame;
+ if (e.isCluster()) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
+ }
+ else { // e.isConnection()
+ if (state == NEWBIE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ }
+ else {
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (!connection) return;
+ if (e.getType() == CONTROL) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ connection->delivered(frame);
}
}
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ connection->deliverBuffer(buf);
+ }
}
}
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivered: " << e.what());
- leave(l);
- }
}
struct AddrList {
@@ -328,23 +329,24 @@
return o << a.suffix;
}
+// Entry point: called by IO to dispatch CPG events.
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
h.rewatch();
- }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
leave();
}
}
+// Entry point: called if disconnected from CPG.
void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
+ QPID_LOG(critical, *this << " error disconnected from cluster");
broker.shutdown();
}
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
@@ -372,16 +374,16 @@
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true);
- QPID_LOG(info, *this << " first in cluster at " << myUrl);
state = READY;
+ QPID_LOG(notice, *this << " first in cluster");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
}
else { // Joining established group.
state = NEWBIE;
+ QPID_LOG(info, *this << " request state dump");
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
- QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
else if (state >= READY && memberChange)
@@ -394,7 +396,7 @@
void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
- QPID_LOG(debug, *this << " send dump-offer to " << id);
+ QPID_LOG(info, *this << " send dump-offer to " << id);
mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
}
}
@@ -424,8 +426,8 @@
if (map.ready(id, Url(url)))
memberUpdate(l);
if (state == CATCHUP && id == myId) {
- QPID_LOG(debug, *this << " caught-up, going to ready mode.");
state = READY;
+ QPID_LOG(notice, *this << " caught up");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
mcastQueue.clear();
@@ -442,16 +444,16 @@
dumpStart(myId, dumpee, url->str(), l);
}
else { // Another offer was first.
- QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
state = READY;
+ QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
}
else if (dumpee == myId && url) {
assert(state == NEWBIE);
- QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
setClusterId(uuid);
state = DUMPEE;
+ QPID_LOG(info, *this << " receiving dump from " << dumper);
deliverQueue.stop();
checkDumpIn(l);
}
@@ -465,8 +467,8 @@
Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
deliverQueue.stop();
- QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
@@ -484,10 +486,10 @@
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
// Don't flush the mcast queue till we are READY, on self-deliver.
state = CATCHUP;
+ QPID_LOG(info, *this << " received dump, starting catch-up");
deliverQueue.start();
}
}
@@ -498,16 +500,16 @@
}
void Cluster::dumpOutDone(Lock& l) {
- QPID_LOG(debug, *this << " finished sending dump.");
assert(state == DUMPER);
state = READY;
+ QPID_LOG(info, *this << " sent dump");
deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
void Cluster::dumpOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending state dump: " << e.what());
+ QPID_LOG(error, *this << " error sending dump: " << e.what());
dumpOutDone(l);
}
@@ -529,9 +531,9 @@
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(Lock&) {
+void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
- leave();
+ leave(l);
}
void Cluster::stopFullCluster(Lock& l) {
@@ -541,27 +543,27 @@
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(debug, *this << " member update, map=" << map);
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
-
- failoverExchange->setUrls(vectUrl);
+ std::vector<Url> urls = getUrls(l);
+ size_t size = urls.size();
+ failoverExchange->setUrls(urls);
+
+ if (size == 1 && lastSize > 1 && state >= READY) {
+ QPID_LOG(info, *this << " last broker standing, update queue policies");
+ lastBroker = true;
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (size > 1 && lastBroker) {
+ QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ lastBroker = false;
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastSize = size;
if (mgmtObject) {
-
- if (lastSize != size && size == 1){
- QPID_LOG(info, *this << " last node standing, updating queue policies.");
- broker.getQueues().updateQueueClusterState(true);
- }
- else if (lastSize != size && size > 1) {
- QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
- broker.getQueues().updateQueueClusterState(false);
- }
- lastSize = size;
-
mgmtObject->set_clusterSize(size);
string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += "\n";
+ for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
+ if (iter != urls.begin()) urlstr += "\n";
urlstr += iter->str();
}
mgmtObject->set_members(urlstr);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Nov 26 09:37:16 2008
@@ -213,6 +213,7 @@
boost::optional<ClusterMap> dumpedMap;
size_t lastSize;
+ bool lastBroker;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp Wed Nov 26 09:37:16 2008
@@ -31,7 +31,7 @@
handle = ::dlopen(name, RTLD_NOW);
const char* error = ::dlerror();
if (error) {
- throw Exception(QPID_MSG(error));
+ throw Exception(QPID_MSG(error << ": " << name));
}
}
@@ -52,7 +52,7 @@
void* sym = ::dlsym(handle, name);
const char* error = ::dlerror();
if (error)
- throw Exception(QPID_MSG(error));
+ throw Exception(QPID_MSG(error << ": " << name));
return sym;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Nov 26 09:37:16 2008
@@ -197,6 +197,10 @@
CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
+check_PROGRAMS+=tsxtest
+tsxtest_SOURCES=tsxtest.cpp
+tsxtest_LDADD=$(lib_client)
+
# FIXME aconway 2008-05-23: Disabled interop_runner because it uses
# the obsolete Channel class. Convert to Session and re-enable.
#
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov 26 09:37:16 2008
@@ -120,6 +120,7 @@
std::string prefix = os.str();
const char* argv[] = {
"qpidd " __FILE__ ,
+ "--no-module-dir",
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perfdist?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perfdist (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perfdist Wed Nov 26 09:37:16 2008
@@ -10,10 +10,10 @@
usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ]
Client & broker hosts can also be set in env vars CLIENTS and BROKERS.
-Run perftest with clients running on the clients and brokers running
-on the specified hosts. Clients are assigned to client hosts round
-robin: publishers first, then subscribers. If there are multiple
-brokers (for cluster tests) clients connect to them round robin.
+Run perftest clients on the client hosts against brokers on the broker
+hosts Clients are assigned to client hosts round robin: publishers
+first, then subscribers. If there are multiple brokers (for cluster
+tests) clients connect to them round robin.
Broker hosts can be listed with -b in perftest-args or after ---
at the end of the arguments.