You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/30 01:34:57 UTC

svn commit: r799124 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Connection.cpp ErrorCheck.cpp Quorum_cman.cpp UpdateClient.cpp

Author: aconway
Date: Wed Jul 29 23:34:57 2009
New Revision: 799124

URL: http://svn.apache.org/viewvc?rev=799124&view=rev
Log:
Provide more informative cluster logging at notice level

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 29 23:34:57 2009
@@ -228,7 +228,8 @@
 void Cluster::initialize() {
     if (myUrl.empty())
         myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
-    QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+    QPID_LOG(notice, *this << " member " << self <<  " joining "
+             << name << " with url=" << myUrl);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
@@ -240,7 +241,7 @@
 
 // Called in connection thread to insert a client connection.
 void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
-    QPID_LOG(debug, *this << " add local connection " << c->getId());
+    QPID_LOG(info, *this << " new local connection " << c->getId());
     localConnections.insert(c);
     assert(c->getId().getMember() == self);
     // Announce the connection to the cluster.
@@ -250,7 +251,7 @@
 
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
-    QPID_LOG(debug, *this << " add shadow connection " << c->getId());
+    QPID_LOG(info, *this << " new shadow connection " << c->getId());
     // Safe to use connections here because we're pre-catchup, either
     // discarding or stalled, so deliveredFrame is not processing any
     // connection events.
@@ -267,7 +268,7 @@
 
 // Called by Connection::deliverClose() in deliverFrameQueue thread.
 void Cluster::erase(const ConnectionId& id, Lock&) {
-    QPID_LOG(debug, *this << " erasing connection " << id);
+    QPID_LOG(info, *this << " connection closed " << id);
     connections.erase(id);
     decoder.erase(id);
 }
@@ -357,7 +358,7 @@
         // This preserves the connection decoder fragments for an update.
         const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
         if (offer) {
-            QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+            QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
                      << " to " << MemberId(offer->getUpdatee()));
             deliverEventQueue.stop();
         }
@@ -440,7 +441,7 @@
             connection->deliveredFrame(e);
         }
         else
-            QPID_LOG(debug, *this << " DROP (no connection): " << e);
+            QPID_LOG(trace, *this << " DROP (no connection): " << e);
     }
     else // Drop connection frames while state < CATCHUP
         QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -517,7 +518,7 @@
         broker.setRecovery(nCurrent == 1);
         initialized = true;
     }
-    QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+    QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
              << AddrList(left, nLeft, "left: "));
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
@@ -553,7 +554,6 @@
         }
         else {                  // Joining established group.
             state = JOINER;
-            QPID_LOG(info, *this << " joining cluster: " << map);
             mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
             elders = map.getAlive();
             elders.erase(self);
@@ -603,7 +603,7 @@
         memberUpdate(l);
     if (state == CATCHUP && id == self) {
         setReady(l);
-        QPID_LOG(notice, *this << " caught up, active cluster member");
+        QPID_LOG(notice, *this << " caught up, active cluster member.");
     }
 }
 
@@ -635,7 +635,7 @@
         assert(state == JOINER);
         setClusterId(uuid, l);
         state = UPDATEE;
-        QPID_LOG(info, *this << " receiving update from " << updater);
+        QPID_LOG(notice, *this << " receiving update from " << updater);
         checkUpdateIn(l);
     }
     else {
@@ -662,7 +662,6 @@
     if (updater == self) {
         assert(state == OFFER);
         if (url)  {             // My offer was first.
-            QPID_LOG(info, *this << " retracting offer to " << updatee);
             if (updateThread.id())
                 updateThread.join(); // Join the previous updateThread to avoid leaks.
             updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
@@ -679,7 +678,7 @@
     if (state == LEFT) return;
     assert(state == OFFER);
     state = UPDATER;
-    QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+    QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
     if (updateThread.id())
         updateThread.join(); // Join the previous updateThread to avoid leaks.
     updateThread = Thread(
@@ -711,13 +710,13 @@
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         discarding = false;     // ok to set, we're stalled for update.
-        QPID_LOG(info, *this << " received update, starting catch-up");
+        QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
         deliverEventQueue.start();
     }
     else if (updateRetracted) { // Update was retracted, request another update
         updateRetracted = false;
         state = JOINER;
-        QPID_LOG(info, *this << " re-try joining after retracted update");
+        QPID_LOG(notice, *this << " update retracted, sending new update request");
         mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
         deliverEventQueue.start();
     }
@@ -729,7 +728,7 @@
 }
 
 void Cluster::updateOutDone(Lock& l) {
-    QPID_LOG(info, *this << " sent update");
+    QPID_LOG(notice, *this << " update sent");
     assert(state == UPDATER);
     state = READY;
     mcast.release();
@@ -834,9 +833,9 @@
         "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
-    o << cluster.self << "(" << STATE[cluster.state];
+    o << "cluster:" << STATE[cluster.state];
     if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
-    return o << ")";
+    return o;
 }
 
 MemberId Cluster::getId() const {
@@ -863,7 +862,7 @@
         mgmtObject->set_clusterID(clusterId.str());
         mgmtObject->set_memberID(stream.str());
     }
-    QPID_LOG(debug, *this << " cluster-id = " << clusterId);
+    QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
 }
 
 void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
@@ -875,7 +874,7 @@
     // ErrorCheck class) then we have processed succesfully past the
     // point of the error.
     if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
-        QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally.");
+        QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
         mcast.mcastControl(
             ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jul 29 23:34:57 2009
@@ -439,13 +439,13 @@
 void Connection::exchange(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
-    QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());    
+    QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
 }
 
 void Connection::queue(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
-    QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
+    QPID_LOG(debug, cluster << " updated queue " << q->getName());
 }
 
 void Connection::sessionError(uint16_t , const std::string& msg) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Wed Jul 29 23:34:57 2009
@@ -57,7 +57,7 @@
     QPID_LOG(error, cluster
              << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
              << " error " << frameSeq << " on " << c << ": " << msg
-             << " (unresolved: " << unresolved << ")");
+             << " must be resolved with: " << unresolved);
     mcast.mcastControl(
         ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
     // If there are already frames queued up by a previous error, review
@@ -87,8 +87,8 @@
                 throw Exception("Aborted by failure that did not occur on all replicas");
             }
             else {              // his error is worse/same as mine.
-                QPID_LOG(debug, cluster << " error " << frameSeq
-                         << " outcome agrees with " << i->getMemberId());
+                QPID_LOG(notice, cluster << " error " << frameSeq
+                         << " resolved with " << i->getMemberId());
                 unresolved.erase(i->getMemberId());
                 checkResolved();
             }
@@ -117,10 +117,11 @@
 void ErrorCheck::checkResolved() {
     if (unresolved.empty()) {   // No more potentially conflicted members, we're clear.
         type = ERROR_TYPE_NONE;
-        QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+        QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
     }
     else 
-        QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+        QPID_LOG(notice, cluster << " error " << frameSeq
+                 << " must be resolved with " << unresolved);
 }
 
 EventFrame ErrorCheck::getNext() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Wed Jul 29 23:34:57 2009
@@ -31,7 +31,6 @@
 Quorum::~Quorum() { if (cman) cman_finish(cman); }
 
 void Quorum::init() {
-    QPID_LOG(info, "Waiting for cluster quorum");
     enable = true;
     cman = cman_init(0);
     if (cman == 0) throw ErrnoException("Can't connect to cman service");

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=799124&r1=799123&r2=799124&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jul 29 23:34:57 2009
@@ -124,7 +124,8 @@
 }
 
 void UpdateClient::update() {
-    QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
+    QPID_LOG(debug, updaterId << " updating state to " << updateeId
+             << " at " << updateeUrl);
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org