You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/08/06 19:41:19 UTC

svn commit: r801740 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h ClusterPlugin.cpp ClusterSettings.h Cpg.cpp Cpg.h OutputInterceptor.cpp PollerDispatch.cpp PollerDispatch.h Quorum_cman.cpp Quorum_cman.h Quorum_null.h

Author: aconway
Date: Thu Aug  6 17:41:18 2009
New Revision: 801740

URL: http://svn.apache.org/viewvc?rev=801740&view=rev
Log:
Fix cman integration to exit immediately on loss of quorum.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug  6 17:41:18 2009
@@ -190,6 +190,7 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
+    quorum(boost::bind(&Cluster::leave, this)),
     initialized(false),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
@@ -214,7 +215,6 @@
     // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
     broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
 
-    if (settings.quorum) quorum.init();
     cpg.join(name);
     // pump the CPG dispatch manually till we get initialized. 
     while (!initialized)
@@ -226,10 +226,10 @@
 }
 
 void Cluster::initialize() {
+    if (settings.quorum) quorum.start(poller);
     if (myUrl.empty())
         myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
-    QPID_LOG(notice, *this << " member " << self <<  " joining "
-             << name << " with url=" << myUrl);
+    QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
@@ -404,6 +404,7 @@
     LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
     LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
     Mutex::ScopedLock l(lock);
+    if (state == LEFT) return;
     EventFrame e(efConst);
     const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
     if (offer && error.isUnresolved()) {
@@ -510,7 +511,7 @@
     const cpg_name */*group*/,
     const cpg_address *current, int nCurrent,
     const cpg_address *left, int nLeft,
-    const cpg_address */*joined*/, int /*nJoined*/)
+    const cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
     if (state == INIT) {        // First config change.
@@ -518,8 +519,11 @@
         broker.setRecovery(nCurrent == 1);
         initialized = true;
     }
-    QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
-             << AddrList(left, nLeft, "left: "));
+    QPID_LOG(notice, *this << " membership change: "
+             << AddrList(current, nCurrent) << "("
+             << AddrList(joined, nJoined, "joined: ")
+             << AddrList(left, nLeft, "left: ")
+             << ")");
     std::string addresses;
     for (const cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
@@ -833,9 +837,9 @@
         "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
-    o << "cluster:" << STATE[cluster.state];
+    o << "cluster(" << cluster.self << " " << STATE[cluster.state];
     if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
-    return o;
+    return o << ")";;
 }
 
 MemberId Cluster::getId() const {
@@ -846,14 +850,6 @@
     return broker; // Immutable,  no need to lock.
 }
 
-void Cluster::checkQuorum() {
-    if (!quorum.isQuorate()) {
-        QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
-        leave();
-        throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
-    }
-}
-
 void Cluster::setClusterId(const Uuid& uuid, Lock&) {
     clusterId = uuid;
     if (mgmtObject) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Aug  6 17:41:18 2009
@@ -101,8 +101,6 @@
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
 
-    void checkQuorum();
-
     const ClusterSettings& getSettings() const { return settings; }
 
     void deliverFrame(const EventFrame&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Aug  6 17:41:18 2009
@@ -72,7 +72,7 @@
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
-            // FIXME aconway 2009-05-20: temporary 
+            // TODO aconway 2009-05-20: temporary, remove
             ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
             ;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Thu Aug  6 17:41:18 2009
@@ -37,7 +37,7 @@
     bool checkErrors;
 
     ClusterSettings() : quorum(false), readMax(10),
-                        checkErrors(true) // FIXME aconway 2009-05-20: temporary
+                        checkErrors(true) // TODO aconway 2009-05-20: remove this option.
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug  6 17:41:18 2009
@@ -105,7 +105,7 @@
 
     QPID_LOG(info, "Initializing CPG");
     cpg_error_t err = cpg_initialize(&handle, &callbacks);
-    int retries = 6;
+    int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
     while (err == CPG_ERR_TRY_AGAIN && --retries) {
         QPID_LOG(notice, "Re-trying CPG initialization.");
         sys::sleep(5);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug  6 17:41:18 2009
@@ -20,7 +20,6 @@
  */
 
 #include "qpid/Exception.h"
-#include "qpid/cluster/Dispatchable.h"
 #include "qpid/cluster/types.h"
 #include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Mutex.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Aug  6 17:41:18 2009
@@ -46,7 +46,6 @@
 
 void OutputInterceptor::send(framing::AMQFrame& f) {
     LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
-    parent.getCluster().checkQuorum();
     {
         sys::Mutex::ScopedLock l(lock);
         next->send(f);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Thu Aug  6 17:41:18 2009
@@ -33,15 +33,18 @@
                      boost::bind(&PollerDispatch::dispatch, this, _1), // read
                      0,         // write
                      boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect
-      )
+      ),
+      started(false)
 {}
     
 PollerDispatch::~PollerDispatch() {
-    dispatchHandle.stopWatch();
+    if (started)
+        dispatchHandle.stopWatch();
 }
 
 void PollerDispatch::start() {
     dispatchHandle.startWatch(poller);
+    started = true;
 }
 
 // Entry point: called by IO to dispatch CPG events.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Thu Aug  6 17:41:18 2009
@@ -51,6 +51,7 @@
     boost::shared_ptr<sys::Poller> poller;
     boost::function<void()> onError;
     sys::DispatchHandleRef dispatchHandle;
+    bool started;
 
 
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Thu Aug  6 17:41:18 2009
@@ -18,28 +18,86 @@
  * under the License.
  *
  */
+
 #include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Options.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
 
 namespace qpid {
 namespace cluster {
 
-Quorum::Quorum() : enable(false), cman(0) {}
+namespace {
+
+boost::function<void()> errorFn;
 
-Quorum::~Quorum() { if (cman) cman_finish(cman); }
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
+    if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+        QPID_LOG(critical, "Lost contact with cluster quorum.");
+        if (errorFn) errorFn();
+        cman_stop_notification(handle);
+    }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) {
+    errorFn = err;
+}
+
+Quorum::~Quorum() {
+    dispatchHandle.reset();
+    if (cman) cman_finish(cman);
+}
 
-void Quorum::init() {
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+    poller = p;
     enable = true;
+    QPID_LOG(debug, "Connecting to quorum service.");
     cman = cman_init(0);
     if (cman == 0) throw ErrnoException("Can't connect to cman service");
     if (!cman_is_quorate(cman)) {
         QPID_LOG(notice, "Waiting for cluster quorum.");
         while(!cman_is_quorate(cman)) sys::sleep(5);
     }
+    int err = cman_start_notification(cman, cmanCallbackFn);
+    if (err != 0) throw ErrnoException("Can't register for cman notifications");
+    watch(getFd());
 }
 
-bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
+void Quorum::watch(int fd) {
+    cmanFd = fd;
+    dispatchHandle.reset(
+        new sys::DispatchHandleRef(
+            sys::PosixIOHandle(cmanFd),
+            boost::bind(&Quorum::dispatch, this, _1), // read
+            0, // write
+            boost::bind(&Quorum::disconnect, this, _1)  // disconnect
+        ));
+    dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+    int fd = cman_get_fd(cman);
+    if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+    return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+    try {
+        cman_dispatch(cman, CMAN_DISPATCH_ALL);
+        int fd = getFd();
+        if (fd != cmanFd) watch(fd);
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+        errorFn();
+    }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+    QPID_LOG(critical, "Disconnected from quorum service");
+    errorFn();
+}
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h Thu Aug  6 17:41:18 2009
@@ -22,26 +22,40 @@
  *
  */
 
+#include <qpid/sys/DispatchHandle.h>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+
 extern "C" {
 #include <libcman.h>
 }
 
 namespace qpid {
-
-class Options;
+namespace sys {
+class Poller;
+}
 
 namespace cluster {
+class Cluster;
 
 class Quorum {
   public:
-    Quorum();
+    Quorum(boost::function<void ()> onError);
     ~Quorum();
-    void init();
-    bool isQuorate();
+    void start(boost::shared_ptr<sys::Poller>);
     
   private:
+    void dispatch(sys::DispatchHandle&);
+    void disconnect(sys::DispatchHandle&);
+    int getFd();
+    void watch(int fd);
+    
     bool enable;
     cman_handle_t cman;
+    int cmanFd;
+    std::auto_ptr<sys::DispatchHandleRef> dispatchHandle;
+    boost::shared_ptr<sys::Poller> poller;
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=801740&r1=801739&r2=801740&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h Thu Aug  6 17:41:18 2009
@@ -21,15 +21,20 @@
  * under the License.
  *
  */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+
 namespace qpid {
 namespace cluster {
+class Cluster;
 
 /** Null implementation of quorum. */
 
 class Quorum {
   public:
-    void init() {}
-    bool isQuorate() { return true; }
+    Quorum(boost::function<void ()>) {}
+    void start(boost::shared_ptr<sys::Poller>) {}
 };
 
 #endif  /*!QPID_CLUSTER_QUORUM_NULL_H*/



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org