You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/30 01:34:57 UTC
svn commit: r799124 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Connection.cpp ErrorCheck.cpp Quorum_cman.cpp UpdateClient.cpp
Author: aconway
Date: Wed Jul 29 23:34:57 2009
New Revision: 799124
URL: http://svn.apache.org/viewvc?rev=799124&view=rev
Log:
Provide more informative cluster logging at notice level
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 29 23:34:57 2009
@@ -228,7 +228,8 @@
void Cluster::initialize() {
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " member " << self << " joining "
+ << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -240,7 +241,7 @@
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add local connection " << c->getId());
+ QPID_LOG(info, *this << " new local connection " << c->getId());
localConnections.insert(c);
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
@@ -250,7 +251,7 @@
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add shadow connection " << c->getId());
+ QPID_LOG(info, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, either
// discarding or stalled, so deliveredFrame is not processing any
// connection events.
@@ -267,7 +268,7 @@
// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id, Lock&) {
- QPID_LOG(debug, *this << " erasing connection " << id);
+ QPID_LOG(info, *this << " connection closed " << id);
connections.erase(id);
decoder.erase(id);
}
@@ -357,7 +358,7 @@
// This preserves the connection decoder fragments for an update.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer) {
- QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+ QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
<< " to " << MemberId(offer->getUpdatee()));
deliverEventQueue.stop();
}
@@ -440,7 +441,7 @@
connection->deliveredFrame(e);
}
else
- QPID_LOG(debug, *this << " DROP (no connection): " << e);
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -517,7 +518,7 @@
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
@@ -553,7 +554,6 @@
}
else { // Joining established group.
state = JOINER;
- QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
elders.erase(self);
@@ -603,7 +603,7 @@
memberUpdate(l);
if (state == CATCHUP && id == self) {
setReady(l);
- QPID_LOG(notice, *this << " caught up, active cluster member");
+ QPID_LOG(notice, *this << " caught up, active cluster member.");
}
}
@@ -635,7 +635,7 @@
assert(state == JOINER);
setClusterId(uuid, l);
state = UPDATEE;
- QPID_LOG(info, *this << " receiving update from " << updater);
+ QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
else {
@@ -662,7 +662,6 @@
if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(info, *this << " retracting offer to " << updatee);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
@@ -679,7 +678,7 @@
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+ QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(
@@ -711,13 +710,13 @@
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(info, *this << " received update, starting catch-up");
+ QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(info, *this << " re-try joining after retracted update");
+ QPID_LOG(notice, *this << " update retracted, sending new update request");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -729,7 +728,7 @@
}
void Cluster::updateOutDone(Lock& l) {
- QPID_LOG(info, *this << " sent update");
+ QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
mcast.release();
@@ -834,9 +833,9 @@
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << cluster.self << "(" << STATE[cluster.state];
+ o << "cluster:" << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o << ")";
+ return o;
}
MemberId Cluster::getId() const {
@@ -863,7 +862,7 @@
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-id = " << clusterId);
+ QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
@@ -875,7 +874,7 @@
// ErrorCheck class) then we have processed succesfully past the
// point of the error.
if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally.");
+ QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jul 29 23:34:57 2009
@@ -439,13 +439,13 @@
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
- QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());
+ QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
}
void Connection::queue(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
- QPID_LOG(debug, cluster << " decoded queue " << q->getName());
+ QPID_LOG(debug, cluster << " updated queue " << q->getName());
}
void Connection::sessionError(uint16_t , const std::string& msg) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Wed Jul 29 23:34:57 2009
@@ -57,7 +57,7 @@
QPID_LOG(error, cluster
<< (type == ERROR_TYPE_SESSION ? " channel" : " connection")
<< " error " << frameSeq << " on " << c << ": " << msg
- << " (unresolved: " << unresolved << ")");
+ << " must be resolved with: " << unresolved);
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
// If there are already frames queued up by a previous error, review
@@ -87,8 +87,8 @@
throw Exception("Aborted by failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
- QPID_LOG(debug, cluster << " error " << frameSeq
- << " outcome agrees with " << i->getMemberId());
+ QPID_LOG(notice, cluster << " error " << frameSeq
+ << " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
@@ -117,10 +117,11 @@
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+ QPID_LOG(notice, cluster << " error " << frameSeq
+ << " must be resolved with " << unresolved);
}
EventFrame ErrorCheck::getNext() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Wed Jul 29 23:34:57 2009
@@ -31,7 +31,6 @@
Quorum::~Quorum() { if (cman) cman_finish(cman); }
void Quorum::init() {
- QPID_LOG(info, "Waiting for cluster quorum");
enable = true;
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jul 29 23:34:57 2009
@@ -124,7 +124,8 @@
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
+ QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org