You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/26 18:37:16 UTC

svn commit: r720924 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/sys/posix/Shlib.cpp tests/Makefile.am tests/cluster_test.cpp tests/perfdist

Author: aconway
Date: Wed Nov 26 09:37:16 2008
New Revision: 720924

URL: http://svn.apache.org/viewvc?rev=720924&view=rev
Log:
Cluster.cpp: Fixed last-node-standing logic, better logging.
Shlib.cpp: added file name to errors messages.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perfdist

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 26 09:37:16 2008
@@ -102,7 +102,8 @@
     mcastId(0),
     mgmtObject(0),
     state(INIT),
-    lastSize(1)
+    lastSize(0),
+    lastBroker(false)
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
@@ -115,7 +116,7 @@
     failoverExchange.reset(new FailoverExchange(this));
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start();
-    QPID_LOG(notice, *this << " joining cluster " << name.str());
+    QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
     if (useQuorum) quorum.init();
     cpg.join(name);
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -198,9 +199,8 @@
 void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
-        if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         QPID_LOG(notice, *this << " leaving cluster " << name.str());
-
+        if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         if (!deliverQueue.isStopped()) deliverQueue.stop();
         try { cpg.leave(name); }
         catch (const std::exception& e) {
@@ -258,47 +258,48 @@
     deliverQueue.push(e);       // Otherwise enqueue for processing.
 }
 
+// Entry point: called when deliverQueue has events to process.
 void Cluster::delivered(const Event& e) {
-    Lock l(lock);
-    delivered(e,l);
+    try {
+        Lock l(lock);
+        delivered(e,l);
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+        leave();
+    }
+
 }
 
 void Cluster::delivered(const Event& e, Lock& l) {
-    try {
-        Buffer buf(e);
-        AMQFrame frame;
-        if (e.isCluster())  {
-            while (frame.decode(buf)) {
-                QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
-                ClusterDispatcher dispatch(*this, e.getMemberId(), l);
-                if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
-                    throw Exception(QPID_MSG("Invalid cluster control"));
-            }
-        }
-        else {                      // e.isConnection()
-            if (state == NEWBIE) {
-                QPID_LOG(trace, *this << " DROP: " << e);
-            }
-            else {
-                boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
-                if (!connection) return;
-                if (e.getType() == CONTROL) {              
-                    while (frame.decode(buf)) {
-                        QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
-                        connection->delivered(frame);
-                    }
-                }
-                else  {
-                    QPID_LOG(trace, *this << " DLVR: " << e);
-                    connection->deliverBuffer(buf);
+    Buffer buf(e);
+    AMQFrame frame;
+    if (e.isCluster())  {
+        while (frame.decode(buf)) {
+            QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+            ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+            if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+                throw Exception(QPID_MSG("Invalid cluster control"));
+        }
+    }
+    else {                      // e.isConnection()
+        if (state == NEWBIE) {
+            QPID_LOG(trace, *this << " DROP: " << e);
+        }
+        else {
+            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+            if (!connection) return;
+            if (e.getType() == CONTROL) {              
+                while (frame.decode(buf)) {
+                    QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+                    connection->delivered(frame);
                 }
             }
+            else  {
+                QPID_LOG(trace, *this << " DLVR: " << e);
+                connection->deliverBuffer(buf);
+            }
         }
     }
-    catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster delivered: " << e.what());
-        leave(l);
-    }
 }
 
 struct AddrList {
@@ -328,23 +329,24 @@
     return o << a.suffix;
 }
 
+// Entry point: called by IO to dispatch CPG events.
 void Cluster::dispatch(sys::DispatchHandle& h) {
     try {
         cpg.dispatchAll();
         h.rewatch();
-    }
-    catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
         leave();
     }
 }
 
+// Entry point: called if disconnected from  CPG.
 void Cluster::disconnect(sys::DispatchHandle& ) {
-    QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
+    QPID_LOG(critical, *this << " error disconnected from cluster");
     broker.shutdown();
 }
 
-void Cluster::configChange (
+void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
     cpg_address *current, int nCurrent,
@@ -372,16 +374,16 @@
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
             setClusterId(true);
-            QPID_LOG(info, *this << " first in cluster at " << myUrl);
             state = READY;
+            QPID_LOG(notice, *this << " first in cluster");
             if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
             map = ClusterMap(myId, myUrl, true);
             memberUpdate(l);
         }
         else {                  // Joining established group.
             state = NEWBIE;
+            QPID_LOG(info, *this << " request state dump");
             mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
-            QPID_LOG(debug, *this << " send dump-request " << myUrl);
         }
     }
     else if (state >= READY && memberChange)
@@ -394,7 +396,7 @@
 void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
     if (state == READY && map.isNewbie(id)) {
         state = OFFER;
-        QPID_LOG(debug, *this << " send dump-offer to " << id);
+        QPID_LOG(info, *this << " send dump-offer to " << id);
         mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
     }
 }
@@ -424,8 +426,8 @@
     if (map.ready(id, Url(url))) 
         memberUpdate(l);
     if (state == CATCHUP && id == myId) {
-        QPID_LOG(debug, *this << " caught-up, going to ready mode.");
         state = READY;
+        QPID_LOG(notice, *this << " caught up");
         if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
         for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
         mcastQueue.clear();
@@ -442,16 +444,16 @@
             dumpStart(myId, dumpee, url->str(), l);
         }
         else {                  // Another offer was first.
-            QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
             state = READY;
+            QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
             tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
         }
     }
     else if (dumpee == myId && url) {
         assert(state == NEWBIE);
-        QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
         setClusterId(uuid);
         state = DUMPEE;
+        QPID_LOG(info, *this << " receiving dump from " << dumper);
         deliverQueue.stop();
         checkDumpIn(l);
     }
@@ -465,8 +467,8 @@
     Url url(urlStr);
     assert(state == OFFER);
     state = DUMPER;
+    QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
     deliverQueue.stop();
-    QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(
         new DumpClient(myId, dumpee, url, broker, map, getConnections(l), 
@@ -484,10 +486,10 @@
     if (state == LEFT) return;
     if (state == DUMPEE && dumpedMap) {
         map = *dumpedMap;
-        QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map);
         mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
         // Don't flush the mcast queue till we are READY, on self-deliver.
         state = CATCHUP;
+        QPID_LOG(info, *this << " received dump, starting catch-up");
         deliverQueue.start();
     }
 }
@@ -498,16 +500,16 @@
 }
 
 void Cluster::dumpOutDone(Lock& l) {
-    QPID_LOG(debug, *this  << " finished sending dump.");
     assert(state == DUMPER);
     state = READY;
+    QPID_LOG(info, *this << " sent dump");
     deliverQueue.start();
     tryMakeOffer(map.firstNewbie(), l); // Try another offer
 }
 
 void Cluster::dumpOutError(const std::exception& e)  {
     Monitor::ScopedLock l(lock);
-    QPID_LOG(error, *this << " error sending state dump: " << e.what());    
+    QPID_LOG(error, *this << " error sending dump: " << e.what());    
     dumpOutDone(l);
 }
 
@@ -529,9 +531,9 @@
     return Manageable::STATUS_OK;
 }    
 
-void Cluster::stopClusterNode(Lock&) {
+void Cluster::stopClusterNode(Lock& l) {
     QPID_LOG(notice, *this << " stopped by admin");
-    leave();
+    leave(l);
 }
 
 void Cluster::stopFullCluster(Lock& l) {
@@ -541,27 +543,27 @@
 
 void Cluster::memberUpdate(Lock& l) {
     QPID_LOG(debug, *this << " member update, map=" << map);
-    std::vector<Url> vectUrl = getUrls(l);
-    size_t size = vectUrl.size();
-
-    failoverExchange->setUrls(vectUrl);
+    std::vector<Url> urls = getUrls(l);
+    size_t size = urls.size();
+    failoverExchange->setUrls(urls);
+
+    if (size == 1 && lastSize > 1 && state >= READY) { 
+        QPID_LOG(info, *this << " last broker standing, update queue policies");
+        lastBroker = true;
+        broker.getQueues().updateQueueClusterState(true);
+    }
+    else if (size > 1 && lastBroker) {
+        QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+        lastBroker = false;
+        broker.getQueues().updateQueueClusterState(false);
+    }
+    lastSize = size;
 
     if (mgmtObject) {
-
-        if (lastSize != size && size == 1){
-            QPID_LOG(info, *this << " last node standing, updating queue policies.");
-            broker.getQueues().updateQueueClusterState(true);
-        }
-        else if (lastSize != size && size > 1) {
-            QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
-            broker.getQueues().updateQueueClusterState(false);
-        }
-        lastSize = size;
-		
         mgmtObject->set_clusterSize(size); 
         string urlstr;
-        for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-            if (iter != vectUrl.begin()) urlstr += "\n";
+        for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
+            if (iter != urls.begin()) urlstr += "\n";
             urlstr += iter->str();
         }
         mgmtObject->set_members(urlstr);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Nov 26 09:37:16 2008
@@ -213,6 +213,7 @@
     boost::optional<ClusterMap> dumpedMap;
     
     size_t lastSize;
+    bool lastBroker;
     boost::shared_ptr<FailoverExchange> failoverExchange;
 
     Quorum quorum;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp Wed Nov 26 09:37:16 2008
@@ -31,7 +31,7 @@
     handle = ::dlopen(name, RTLD_NOW);
     const char* error = ::dlerror();
     if (error) {
-        throw Exception(QPID_MSG(error));
+        throw Exception(QPID_MSG(error << ": " << name));
     }
 }
 
@@ -52,7 +52,7 @@
     void* sym = ::dlsym(handle, name);
     const char* error = ::dlerror();
     if (error) 
-        throw Exception(QPID_MSG(error));
+        throw Exception(QPID_MSG(error << ": " << name));
     return sym;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Nov 26 09:37:16 2008
@@ -197,6 +197,10 @@
 
 CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
 
+check_PROGRAMS+=tsxtest
+tsxtest_SOURCES=tsxtest.cpp
+tsxtest_LDADD=$(lib_client) 
+
 # FIXME aconway 2008-05-23: Disabled interop_runner because it uses
 # the obsolete Channel class.  Convert to Session and re-enable.
 # 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov 26 09:37:16 2008
@@ -120,6 +120,7 @@
         std::string prefix = os.str();
         const char* argv[] = {
             "qpidd " __FILE__ ,
+            "--no-module-dir",
             "--load-module=../.libs/cluster.so",
             "--cluster-name", name.c_str(), 
             "--auth=no", "--no-data-dir",

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perfdist?rev=720924&r1=720923&r2=720924&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perfdist (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perfdist Wed Nov 26 09:37:16 2008
@@ -10,10 +10,10 @@
 usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ]
 Client & broker hosts can also be set in env vars CLIENTS and BROKERS.
 
-Run perftest with clients running on the clients and brokers running
-on the specified hosts. Clients are assigned to client hosts round
-robin: publishers first, then subscribers. If there are multiple
-brokers (for cluster tests) clients connect to them round robin.
+Run perftest clients on the client hosts against brokers on the broker
+hosts Clients are assigned to client hosts round robin: publishers
+first, then subscribers. If there are multiple brokers (for cluster
+tests) clients connect to them round robin.
 
 Broker hosts can be listed with -b in perftest-args or after ---
 at the end of the arguments.